You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:24:52 UTC
[02/39] hive git commit: HIVE-13138. Add client to communicate with
interface, initial split setup. (Siddharth Seth and Vikram Dixit K)
HIVE-13138. Add client to communicate with interface, initial split
setup. (Siddharth Seth and Vikram Dixit K)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f272acea
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f272acea
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f272acea
Branch: refs/heads/master
Commit: f272aceaf7da77f9d87f5be42bb1520181035c2c
Parents: bf83407
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Feb 23 23:55:46 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Feb 23 23:55:46 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../org/apache/hive/jdbc/LlapInputFormat.java | 10 +
.../daemon/rpc/LlapDaemonProtocolProtos.java | 159 ++++++++++---
.../src/protobuf/LlapDaemonProtocol.proto | 8 +
.../hive/llap/daemon/impl/LlapDaemon.java | 3 +
.../llap/daemon/impl/TaskRunnerCallable.java | 5 +-
.../ext/LlapTaskUmbilicalExternalClient.java | 197 ++++++++++++++++
.../helpers/LlapTaskUmbilicalServer.java | 57 +++++
.../hadoop/hive/llap/LlapInputFormat.java | 146 +++---------
.../apache/hadoop/hive/llap/LlapInputSplit.java | 80 ++++---
.../apache/hadoop/hive/llap/SubmitWorkInfo.java | 65 ++++++
.../hive/ql/exec/tez/HiveSplitGenerator.java | 49 +++-
.../hive/ql/exec/tez/MapRecordProcessor.java | 2 +
.../hive/ql/parse/TypeCheckProcFactory.java | 3 +
.../ql/udf/generic/GenericUDFGetSplits.java | 224 +++++++++++++++++--
.../org/apache/tez/dag/api/TaskSpecBuilder.java | 45 ++++
16 files changed, 837 insertions(+), 219 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7fbcbba..6a22890 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2687,6 +2687,9 @@ public class HiveConf extends Configuration {
LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003,
"LLAP daemon output service port"),
+ LLAP_TMP_SUBMITWORK_USING_TEZ_AM("hive.llap.tmp.submit.work.using.tez.am", true,""),
+ LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS("hive.llap.tmp.ext.client.num.server.handlers", 1, ""),
+
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
"Timeout for requests from Hive client to remote Spark driver."),
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
index 97fe2c5..c38dd82 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
@@ -59,6 +59,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
private String pwd; // ""
private String query;
+ public final String URL_KEY = "llap.if.hs2.connection";
+ public final String QUERY_KEY = "llap.if.query";
+ public final String USER_KEY = "llap.if.user";
+ public final String PWD_KEY = "llap.if.pwd";
+
private Connection con;
private Statement stmt;
@@ -133,6 +138,11 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
List<InputSplit> ins = new ArrayList<InputSplit>();
+ if (url == null) url = job.get(URL_KEY);
+ if (query == null) query = job.get(QUERY_KEY);
+ if (user == null) user = job.get(USER_KEY);
+ if (pwd == null) pwd = job.get(PWD_KEY);
+
if (url == null || query == null) {
throw new IllegalStateException();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 4ab7b32..653e7e0 100644
--- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -7334,6 +7334,16 @@ public final class LlapDaemonProtocolProtos {
* <code>optional .FragmentRuntimeInfo fragment_runtime_info = 10;</code>
*/
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfoOrBuilder getFragmentRuntimeInfoOrBuilder();
+
+ // optional bool usingTezAm = 11 [default = true];
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ boolean hasUsingTezAm();
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ boolean getUsingTezAm();
}
/**
* Protobuf type {@code SubmitWorkRequestProto}
@@ -7452,6 +7462,11 @@ public final class LlapDaemonProtocolProtos {
bitField0_ |= 0x00000200;
break;
}
+ case 88: {
+ bitField0_ |= 0x00000400;
+ usingTezAm_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -7799,6 +7814,22 @@ public final class LlapDaemonProtocolProtos {
return fragmentRuntimeInfo_;
}
+ // optional bool usingTezAm = 11 [default = true];
+ public static final int USINGTEZAM_FIELD_NUMBER = 11;
+ private boolean usingTezAm_;
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public boolean hasUsingTezAm() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public boolean getUsingTezAm() {
+ return usingTezAm_;
+ }
+
private void initFields() {
containerIdString_ = "";
amHost_ = "";
@@ -7810,6 +7841,7 @@ public final class LlapDaemonProtocolProtos {
appAttemptNumber_ = 0;
fragmentSpec_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto.getDefaultInstance();
fragmentRuntimeInfo_ = org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo.getDefaultInstance();
+ usingTezAm_ = true;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -7853,6 +7885,9 @@ public final class LlapDaemonProtocolProtos {
if (((bitField0_ & 0x00000200) == 0x00000200)) {
output.writeMessage(10, fragmentRuntimeInfo_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ output.writeBool(11, usingTezAm_);
+ }
getUnknownFields().writeTo(output);
}
@@ -7902,6 +7937,10 @@ public final class LlapDaemonProtocolProtos {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(10, fragmentRuntimeInfo_);
}
+ if (((bitField0_ & 0x00000400) == 0x00000400)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(11, usingTezAm_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -7975,6 +8014,11 @@ public final class LlapDaemonProtocolProtos {
result = result && getFragmentRuntimeInfo()
.equals(other.getFragmentRuntimeInfo());
}
+ result = result && (hasUsingTezAm() == other.hasUsingTezAm());
+ if (hasUsingTezAm()) {
+ result = result && (getUsingTezAm()
+ == other.getUsingTezAm());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -8028,6 +8072,10 @@ public final class LlapDaemonProtocolProtos {
hash = (37 * hash) + FRAGMENT_RUNTIME_INFO_FIELD_NUMBER;
hash = (53 * hash) + getFragmentRuntimeInfo().hashCode();
}
+ if (hasUsingTezAm()) {
+ hash = (37 * hash) + USINGTEZAM_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getUsingTezAm());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -8167,6 +8215,8 @@ public final class LlapDaemonProtocolProtos {
fragmentRuntimeInfoBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000200);
+ usingTezAm_ = true;
+ bitField0_ = (bitField0_ & ~0x00000400);
return this;
}
@@ -8243,6 +8293,10 @@ public final class LlapDaemonProtocolProtos {
} else {
result.fragmentRuntimeInfo_ = fragmentRuntimeInfoBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+ to_bitField0_ |= 0x00000400;
+ }
+ result.usingTezAm_ = usingTezAm_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -8299,6 +8353,9 @@ public final class LlapDaemonProtocolProtos {
if (other.hasFragmentRuntimeInfo()) {
mergeFragmentRuntimeInfo(other.getFragmentRuntimeInfo());
}
+ if (other.hasUsingTezAm()) {
+ setUsingTezAm(other.getUsingTezAm());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -9032,6 +9089,39 @@ public final class LlapDaemonProtocolProtos {
return fragmentRuntimeInfoBuilder_;
}
+ // optional bool usingTezAm = 11 [default = true];
+ private boolean usingTezAm_ = true;
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public boolean hasUsingTezAm() {
+ return ((bitField0_ & 0x00000400) == 0x00000400);
+ }
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public boolean getUsingTezAm() {
+ return usingTezAm_;
+ }
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public Builder setUsingTezAm(boolean value) {
+ bitField0_ |= 0x00000400;
+ usingTezAm_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool usingTezAm = 11 [default = true];</code>
+ */
+ public Builder clearUsingTezAm() {
+ bitField0_ = (bitField0_ & ~0x00000400);
+ usingTezAm_ = true;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:SubmitWorkRequestProto)
}
@@ -14392,7 +14482,7 @@ public final class LlapDaemonProtocolProtos {
"\030\004 \001(\003\022 \n\030first_attempt_start_time\030\005 \001(\003" +
"\022\"\n\032current_attempt_start_time\030\006 \001(\003\"F\n\024" +
"QueryIdentifierProto\022\026\n\016app_identifier\030\001" +
- " \001(\t\022\026\n\016dag_identifier\030\002 \001(\005\"\266\002\n\026SubmitW" +
+ " \001(\t\022\026\n\016dag_identifier\030\002 \001(\005\"\320\002\n\026SubmitW" +
"orkRequestProto\022\033\n\023container_id_string\030\001" +
" \001(\t\022\017\n\007am_host\030\002 \001(\t\022\017\n\007am_port\030\003 \001(\005\022\030",
"\n\020token_identifier\030\004 \001(\t\022\032\n\022credentials_" +
@@ -14400,38 +14490,39 @@ public final class LlapDaemonProtocolProtos {
"n_id_string\030\007 \001(\t\022\032\n\022app_attempt_number\030" +
"\010 \001(\005\022)\n\rfragment_spec\030\t \001(\0132\022.FragmentS" +
"pecProto\0223\n\025fragment_runtime_info\030\n \001(\0132" +
- "\024.FragmentRuntimeInfo\"J\n\027SubmitWorkRespo" +
- "nseProto\022/\n\020submission_state\030\001 \001(\0162\025.Sub" +
- "missionStateProto\"\205\001\n\036SourceStateUpdated" +
- "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" +
- ".QueryIdentifierProto\022\020\n\010src_name\030\002 \001(\t\022",
- " \n\005state\030\003 \001(\0162\021.SourceStateProto\"!\n\037Sou" +
- "rceStateUpdatedResponseProto\"w\n\031QueryCom" +
- "pleteRequestProto\022\020\n\010query_id\030\001 \001(\t\022/\n\020q" +
- "uery_identifier\030\002 \001(\0132\025.QueryIdentifierP" +
- "roto\022\027\n\014delete_delay\030\004 \001(\003:\0010\"\034\n\032QueryCo" +
- "mpleteResponseProto\"t\n\035TerminateFragment" +
- "RequestProto\022/\n\020query_identifier\030\001 \001(\0132\025" +
- ".QueryIdentifierProto\022\"\n\032fragment_identi" +
- "fier_string\030\002 \001(\t\" \n\036TerminateFragmentRe" +
- "sponseProto\"\026\n\024GetTokenRequestProto\"&\n\025G",
- "etTokenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020S" +
- "ourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RU" +
- "NNING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEP" +
- "TED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316" +
- "\002\n\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Su" +
- "bmitWorkRequestProto\032\030.SubmitWorkRespons" +
- "eProto\022W\n\022sourceStateUpdated\022\037.SourceSta" +
- "teUpdatedRequestProto\032 .SourceStateUpdat" +
- "edResponseProto\022H\n\rqueryComplete\022\032.Query" +
- "CompleteRequestProto\032\033.QueryCompleteResp",
- "onseProto\022T\n\021terminateFragment\022\036.Termina" +
- "teFragmentRequestProto\032\037.TerminateFragme" +
- "ntResponseProto2]\n\026LlapManagementProtoco" +
- "l\022C\n\022getDelegationToken\022\025.GetTokenReques" +
- "tProto\032\026.GetTokenResponseProtoBH\n&org.ap" +
- "ache.hadoop.hive.llap.daemon.rpcB\030LlapDa" +
- "emonProtocolProtos\210\001\001\240\001\001"
+ "\024.FragmentRuntimeInfo\022\030\n\nusingTezAm\030\013 \001(" +
+ "\010:\004true\"J\n\027SubmitWorkResponseProto\022/\n\020su" +
+ "bmission_state\030\001 \001(\0162\025.SubmissionStatePr" +
+ "oto\"\205\001\n\036SourceStateUpdatedRequestProto\022/" +
+ "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi",
+ "erProto\022\020\n\010src_name\030\002 \001(\t\022 \n\005state\030\003 \001(\016" +
+ "2\021.SourceStateProto\"!\n\037SourceStateUpdate" +
+ "dResponseProto\"w\n\031QueryCompleteRequestPr" +
+ "oto\022\020\n\010query_id\030\001 \001(\t\022/\n\020query_identifie" +
+ "r\030\002 \001(\0132\025.QueryIdentifierProto\022\027\n\014delete" +
+ "_delay\030\004 \001(\003:\0010\"\034\n\032QueryCompleteResponse" +
+ "Proto\"t\n\035TerminateFragmentRequestProto\022/" +
+ "\n\020query_identifier\030\001 \001(\0132\025.QueryIdentifi" +
+ "erProto\022\"\n\032fragment_identifier_string\030\002 " +
+ "\001(\t\" \n\036TerminateFragmentResponseProto\"\026\n",
+ "\024GetTokenRequestProto\"&\n\025GetTokenRespons" +
+ "eProto\022\r\n\005token\030\001 \001(\014*2\n\020SourceStateProt" +
+ "o\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Sub" +
+ "missionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJEC" +
+ "TED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemonP" +
+ "rotocol\022?\n\nsubmitWork\022\027.SubmitWorkReques" +
+ "tProto\032\030.SubmitWorkResponseProto\022W\n\022sour" +
+ "ceStateUpdated\022\037.SourceStateUpdatedReque" +
+ "stProto\032 .SourceStateUpdatedResponseProt" +
+ "o\022H\n\rqueryComplete\022\032.QueryCompleteReques",
+ "tProto\032\033.QueryCompleteResponseProto\022T\n\021t" +
+ "erminateFragment\022\036.TerminateFragmentRequ" +
+ "estProto\032\037.TerminateFragmentResponseProt" +
+ "o2]\n\026LlapManagementProtocol\022C\n\022getDelega" +
+ "tionToken\022\025.GetTokenRequestProto\032\026.GetTo" +
+ "kenResponseProtoBH\n&org.apache.hadoop.hi" +
+ "ve.llap.daemon.rpcB\030LlapDaemonProtocolPr" +
+ "otos\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -14485,7 +14576,7 @@ public final class LlapDaemonProtocolProtos {
internal_static_SubmitWorkRequestProto_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SubmitWorkRequestProto_descriptor,
- new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", });
+ new java.lang.String[] { "ContainerIdString", "AmHost", "AmPort", "TokenIdentifier", "CredentialsBinary", "User", "ApplicationIdString", "AppAttemptNumber", "FragmentSpec", "FragmentRuntimeInfo", "UsingTezAm", });
internal_static_SubmitWorkResponseProto_descriptor =
getDescriptor().getMessageTypes().get(8);
internal_static_SubmitWorkResponseProto_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 944c96c..e964c5f 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -91,6 +91,7 @@ message SubmitWorkRequestProto {
optional int32 app_attempt_number = 8;
optional FragmentSpecProto fragment_spec = 9;
optional FragmentRuntimeInfo fragment_runtime_info = 10;
+ optional bool usingTezAm = 11 [default = true];
}
enum SubmissionStateProto {
@@ -136,11 +137,18 @@ message GetTokenResponseProto {
optional bytes token = 1;
}
+message SendEventsRequestProto {
+}
+
+message SendEventsResponseProto {
+}
+
service LlapDaemonProtocol {
rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto);
rpc queryComplete(QueryCompleteRequestProto) returns (QueryCompleteResponseProto);
rpc terminateFragment(TerminateFragmentRequestProto) returns (TerminateFragmentResponseProto);
+ rpc sendEvents(SendEventsRequestProto) return (SendEventsResponseProto);
}
service LlapManagementProtocol {
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 8621826..40a89cb 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -14,6 +14,7 @@
package org.apache.hadoop.hive.llap.daemon.impl;
+import org.apache.hadoop.hive.llap.LlapOutputFormatService;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryPoolMXBean;
@@ -279,6 +280,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
LOG.info("Setting shuffle port to: " + ShuffleHandler.get().getPort());
this.shufflePort.set(ShuffleHandler.get().getPort());
super.serviceStart();
+ LlapOutputFormatService.get();
LOG.info("LlapDaemon serviceStart complete");
}
@@ -286,6 +288,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
super.serviceStop();
ShuffleHandler.shutdown();
shutdown();
+ LlapOutputFormatService.get().stop();
LOG.info("LlapDaemon shutdown complete");
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index d88d82a..d9d216d 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -103,6 +103,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private final String queryId;
private final HadoopShim tezHadoopShim;
private boolean shouldRunTask = true;
+ private final boolean withTezAm;
final Stopwatch runtimeWatch = new Stopwatch();
final Stopwatch killtimerWatch = new Stopwatch();
private final AtomicBoolean isStarted = new AtomicBoolean(false);
@@ -131,11 +132,11 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
this.jobToken = TokenCache.getSessionToken(credentials);
this.taskSpec = Converters.getTaskSpecfromProto(request.getFragmentSpec());
this.amReporter = amReporter;
+ this.withTezAm = request.getUsingTezAm();
+ LOG.warn("ZZZ: DBG: usingTezAm=" + withTezAm);
// Register with the AMReporter when the callable is setup. Unregister once it starts running.
- if (jobToken != null) {
this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
request.getUser(), jobToken, fragmentInfo.getQueryInfo().getQueryIdentifier());
- }
this.metrics = metrics;
this.requestId = request.getFragmentSpec().getFragmentIdentifierString();
// TODO Change this to the queryId/Name when that's available.
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
new file mode 100644
index 0000000..ecc032d
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -0,0 +1,197 @@
+package org.apache.hadoop.hive.llap.ext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapTaskUmbilicalExternalClient extends AbstractService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
+
+ private final LlapProtocolClientProxy communicator;
+ private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
+ private final Configuration conf;
+ private final LlapTaskUmbilicalProtocol umbilical;
+
+ protected final String tokenIdentifier;
+ protected final Token<JobTokenIdentifier> sessionToken;
+
+
+ private final ConcurrentMap<String, List<TezEvent>> pendingEvents = new ConcurrentHashMap<>();
+
+
+ // TODO KKK Work out the details of the tokenIdentifier, and the session token.
+ // It may just be possible to create one here - since Shuffle is not involved, and this is only used
+ // for communication from LLAP-Daemons to the server. It will need to be sent in as part
+ // of the job submission request.
+ public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, Token<JobTokenIdentifier> sessionToken) {
+ super(LlapTaskUmbilicalExternalClient.class.getName());
+ this.conf = conf;
+ this.umbilical = new LlapTaskUmbilicalExternalImpl();
+ this.tokenIdentifier = tokenIdentifier;
+ this.sessionToken = sessionToken;
+ // TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
+ this.communicator = new LlapProtocolClientProxy(1, conf, null);
+ }
+
+ @Override
+ public void serviceStart() throws IOException {
+ int numHandlers = HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS);
+ llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
+ }
+
+ @Override
+ public void serviceStop() {
+ llapTaskUmbilicalServer.shutdownServer();
+ if (this.communicator != null) {
+ this.communicator.stop();
+ }
+ }
+
+
+ /**
+ * Submit the work for actual execution. This should always have the usingTezAm flag disabled
+ * @param submitWorkRequestProto
+ */
+ public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort) {
+ Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
+
+ // Store the actual event first. To be returned on the first heartbeat.
+ Event mrInputEvent = null;
+ // Construct a TezEvent out of this, to send it out on the next heaertbeat
+
+// submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()
+
+
+ // Send out the actual SubmitWorkRequest
+ communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
+ new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() {
+ @Override
+ public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
+ if (response.hasSubmissionState()) {
+ if (response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
+ LOG.info("Fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " rejected. Server Busy.");
+ return;
+ }
+ }
+ LOG.info("DBG: Submitted " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
+ }
+
+ @Override
+ public void indicateError(Throwable t) {
+ LOG.error("Failed to submit: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), t);
+ }
+ });
+
+
+
+
+// // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment.
+// // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable
+// QueryIdentifierProto queryIdentifier = QueryIdentifierProto
+// .newBuilder()
+// .setAppIdentifier(submitWorkRequestProto.getApplicationIdString()).setDagIdentifier(submitWorkRequestProto.getFragmentSpec().getDagId())
+// .build();
+// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto sourceStateUpdatedRequest =
+// LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(queryIdentifier).setState(
+// LlapDaemonProtocolProtos.SourceStateProto.S_SUCCEEDED).
+// setSrcName(TODO)
+// communicator.sendSourceStateUpdate(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto.newBuilder().setQueryIdentifier(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()).set);
+
+
+ }
+
+
+
+
+
+
+
+ // TODO Ideally, the server should be shared across all client sessions running on the same node.
+ private class LlapTaskUmbilicalExternalImpl implements LlapTaskUmbilicalProtocol {
+
+ @Override
+ public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+ // Expecting only a single instance of a task to be running.
+ return true;
+ }
+
+ @Override
+ public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException,
+ TezException {
+ // Keep-alive information. The client should be informed and will have to take care of re-submitting the work.
+ // Some parts of fault tolerance go here.
+
+ // This also provides completion information, and a possible notification when task actually starts running (first heartbeat)
+
+ // Incoming events can be ignored until the point when shuffle needs to be handled, instead of just scans.
+
+
+ TezHeartbeatResponse response = new TezHeartbeatResponse();
+ // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
+ TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+ LOG.info("ZZZ: DBG: Received heartbeat from taskAttemptId: " + taskAttemptId.toString());
+
+ List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString());
+
+ response.setLastRequestId(request.getRequestId());
+ // Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.
+ // Also since we have all the MRInput events here - they'll all be sent in together.
+ response.setNextFromEventId(0); // Irrelevant. See comment above.
+ response.setNextPreRoutedEventId(0); //Irrelevant. See comment above.
+ response.setEvents(tezEvents);
+
+ // TODO KKK: Should ideally handle things like Task success notifications.
+ // Can this somehow be hooked into the LlapTaskCommunicator to make testing easy
+
+ return response;
+ }
+
+ @Override
+ public void nodeHeartbeat(Text hostname, int port) throws IOException {
+ // TODO Eventually implement - to handle keep-alive messages from pending work.
+ }
+
+ @Override
+ public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
+ // TODO Eventually implement - to handle preemptions within LLAP daemons.
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
+ int clientMethodsHash) throws IOException {
+ return ProtocolSignature.getProtocolSignature(this, protocol,
+ clientVersion, clientMethodsHash);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
new file mode 100644
index 0000000..dbd591a
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/LlapTaskUmbilicalServer.java
@@ -0,0 +1,57 @@
+package org.apache.hadoop.hive.llap.tezplugins.helpers;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapTaskUmbilicalServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalServer.class);
+
+ protected volatile Server server;
+ private final InetSocketAddress address;
+ private final AtomicBoolean started = new AtomicBoolean(true);
+
+ public LlapTaskUmbilicalServer(Configuration conf, LlapTaskUmbilicalProtocol umbilical, int numHandlers, String tokenIdentifier, Token<JobTokenIdentifier> token) throws
+ IOException {
+ JobTokenSecretManager jobTokenSecretManager =
+ new JobTokenSecretManager();
+ jobTokenSecretManager.addTokenForJob(tokenIdentifier, token);
+
+ server = new RPC.Builder(conf)
+ .setProtocol(LlapTaskUmbilicalProtocol.class)
+ .setBindAddress("0.0.0.0")
+ .setPort(0)
+ .setInstance(umbilical)
+ .setNumHandlers(numHandlers)
+ .setSecretManager(jobTokenSecretManager).build();
+
+ server.start();
+ this.address = NetUtils.getConnectAddress(server);
+ LOG.info(
+ "Started TaskUmbilicalServer: " + umbilical.getClass().getName() + " at address: " + address +
+ " with numHandlers=" + numHandlers);
+ }
+
+ public InetSocketAddress getAddress() {
+ return this.address;
+ }
+
+ public void shutdownServer() {
+ if (started.get()) { // Primarily to avoid multiple shutdowns.
+ started.set(false);
+ server.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
index 4db4d32..d308ec8 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -18,10 +18,17 @@ package org.apache.hadoop.hive.llap;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import java.util.Set;
import javax.security.auth.login.LoginException;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TaskSpecBuilder;
+import org.apache.tez.runtime.api.impl.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +45,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.io.FileNotFoundException;
+import java.util.UUID;
+
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
@@ -82,15 +91,14 @@ import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
-
import com.google.common.base.Preconditions;
public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
- private TezWork work;
- private Schema schema;
+ private final TezWork work;
+ private final Schema schema;
public LlapInputFormat(TezWork tezWork, Schema schema) {
this.work = tezWork;
@@ -98,22 +106,36 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
}
// need empty constructor for bean instantiation
- public LlapInputFormat() {}
+ public LlapInputFormat() {
+ // None of these fields should be required during getRecordReader,
+ // and should not be read.
+ work = null;
+ schema = null;
+ }
/*
* This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
* off the work in the split to LLAP and finally return the connected socket back in an
* LlapRecordReader. The LlapRecordReader class reads the results from the socket.
*/
+ @Override
public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+ // Calls a static method to ensure none of the object fields are read.
+ return _getRecordReader(split, job, reporter);
+ }
+
+ private static RecordReader _getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws
+ IOException {
LlapInputSplit llapSplit = (LlapInputSplit)split;
// TODO: push event into LLAP
// this is just the portion that sets up the io to receive data
String host = split.getLocations()[0];
- String id = job.get(LlapOutputFormat.LLAP_OF_ID_KEY);
+
+ // TODO: need to construct id here. Format is queryId + "_" + taskIndex
+ String id = "foobar";
HiveConf conf = new HiveConf();
Socket socket = new Socket(host,
@@ -130,120 +152,8 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
}
- /*
- * getSplits() gets called as part of the GenericUDFGetSplits call to get splits. Here we create
- * an array of input splits from the work item we have, figure out the location for llap and pass
- * that back for the submission. getRecordReader method above uses that split info to assign the
- * work to llap.
- */
@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- // TODO: need to build proto of plan
-
- DAG dag = DAG.create(work.getName());
- dag.setCredentials(job.getCredentials());
- // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag);
-
- DagUtils utils = DagUtils.getInstance();
- Context ctx = new Context(job);
- MapWork mapWork = (MapWork) work.getAllWork().get(0);
- // bunch of things get setup in the context based on conf but we need only the MR tmp directory
- // for the following method.
- JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
- Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
- FileSystem fs = scratchDir.getFileSystem(job);
- try {
- LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
- Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
- new ArrayList<LocalResource>(), fs, ctx, false, work,
- work.getVertexType(mapWork));
- dag.addVertex(wx);
- utils.addCredentials(mapWork, dag);
-
- // we have the dag now proceed to get the splits:
- HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
- splitGenerator.initializeSplitGenerator(wxConf, mapWork);
- List<Event> eventList = splitGenerator.initialize();
-
- // hack - just serializing with kryo for now. This needs to be done properly
- InputSplit[] result = new InputSplit[eventList.size()];
- int i = 0;
- ByteArrayOutputStream bos = new ByteArrayOutputStream(10240);
-
- InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent)
- eventList.remove(0);
-
- List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints();
- for (Event event: eventList) {
- TaskLocationHint hint = hints.remove(0);
- Set<String> hosts = hint.getHosts();
- SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()];
-
- int j = 0;
- for (String host: hosts) {
- locations[j++] = new SplitLocationInfo(host,false);
- }
-
- bos.reset();
- Kryo kryo = SerializationUtilities.borrowKryo();
- SerializationUtilities.serializeObjectByKryo(kryo, event, bos);
- SerializationUtilities.releaseKryo(kryo);
- result[i++] = new LlapInputSplit(bos.toByteArray(), locations, schema);
- }
- return result;
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- /**
- * Returns a local resource representing a jar. This resource will be used to execute the plan on
- * the cluster.
- *
- * @param localJarPath
- * Local path to the jar to be localized.
- * @return LocalResource corresponding to the localized hive exec resource.
- * @throws IOException
- * when any file system related call fails.
- * @throws LoginException
- * when we are unable to determine the user.
- * @throws URISyntaxException
- * when current jar location cannot be determined.
- */
- private LocalResource createJarLocalResource(String localJarPath, DagUtils utils,
- Configuration conf)
- throws IOException, LoginException, IllegalArgumentException, FileNotFoundException {
- FileStatus destDirStatus = utils.getHiveJarDirectory(conf);
- assert destDirStatus != null;
- Path destDirPath = destDirStatus.getPath();
-
- Path localFile = new Path(localJarPath);
- String sha = getSha(localFile, conf);
-
- String destFileName = localFile.getName();
-
- // Now, try to find the file based on SHA and name. Currently we require exact name match.
- // We could also allow cutting off versions and other stuff provided that SHA matches...
- destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
- + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
-
- // TODO: if this method is ever called on more than one jar, getting the dir and the
- // list need to be refactored out to be done only once.
- Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
- return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
- }
-
- private String getSha(Path localFile, Configuration conf)
- throws IOException, IllegalArgumentException {
- InputStream is = null;
- try {
- FileSystem localFs = FileSystem.getLocal(conf);
- is = localFs.open(localFile);
- return DigestUtils.sha256Hex(is);
- } finally {
- if (is != null) {
- is.close();
- }
- }
+ throw new IOException("These are not the splits you are looking for.");
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
index 78dbb34..4249a16 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
@@ -16,49 +16,49 @@
*/
package org.apache.hadoop.hive.llap;
-import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
-import java.io.DataInputStream;
-import java.io.ByteArrayInputStream;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import java.io.IOException;
+
import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
+import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;
import org.apache.thrift.transport.AutoExpandingBuffer;
-
-import com.google.common.base.Preconditions;
+import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;
public class LlapInputSplit implements InputSplitWithLocationInfo {
- byte[] queryFragment;
+ byte[] planBytes;
+ byte[] fragmentBytes;
SplitLocationInfo[] locations;
Schema schema;
- public LlapInputSplit() {}
- public LlapInputSplit(byte[] queryFragment, SplitLocationInfo[] locations, Schema schema) {
- this.queryFragment = queryFragment;
+ // // Static
+ // ContainerIdString
+ // DagName
+ // VertexName
+ // FragmentNumber
+ // AttemptNumber - always 0
+ // FragmentIdentifierString - taskAttemptId
+
+ // ProcessorDescsriptor
+ // InputSpec
+ // OutputSpec
+
+ // Tokens
+
+ // // Dynamic
+ //
+
+ public LlapInputSplit() {
+ }
+
+ public LlapInputSplit(byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema) {
+ this.planBytes = planBytes;
+ this.fragmentBytes = fragmentBytes;
this.locations = locations;
this.schema = schema;
}
@@ -83,8 +83,11 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
@Override
public void write(DataOutput out) throws IOException {
- out.writeInt(queryFragment.length);
- out.write(queryFragment);
+ out.writeInt(planBytes.length);
+ out.write(planBytes);
+
+ out.writeInt(fragmentBytes.length);
+ out.write(fragmentBytes);
out.writeInt(locations.length);
for (int i = 0; i < locations.length; ++i) {
@@ -108,11 +111,13 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
@Override
public void readFields(DataInput in) throws IOException {
- byte[] queryFragment;
-
int length = in.readInt();
- queryFragment = new byte[length];
- in.readFully(queryFragment);
+ planBytes = new byte[length];
+ in.readFully(planBytes);
+
+ length = in.readInt();
+ fragmentBytes = new byte[length];
+ in.readFully(fragmentBytes);
length = in.readInt();
locations = new SplitLocationInfo[length];
@@ -124,7 +129,8 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
length = in.readInt();
try {
- AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(length, 2d);
+ AutoExpandingBufferWriteTransport transport =
+ new AutoExpandingBufferWriteTransport(length, 2d);
AutoExpandingBuffer buf = transport.getBuf();
in.readFully(buf.array(), 0, length);
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
new file mode 100644
index 0000000..a9a3738
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
@@ -0,0 +1,65 @@
+package org.apache.hadoop.hive.llap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+public class SubmitWorkInfo implements Writable {
+
+ private TaskSpec taskSpec;
+ private ApplicationId fakeAppId;
+
+ public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId) {
+ this.taskSpec = taskSpec;
+ this.fakeAppId = fakeAppId;
+ }
+
+ // Empty constructor for writable etc.
+ public SubmitWorkInfo() {
+ }
+
+ public TaskSpec getTaskSpec() {
+ return taskSpec;
+ }
+
+ public ApplicationId getFakeAppId() {
+ return fakeAppId;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ taskSpec.write(out);
+ out.writeLong(fakeAppId.getClusterTimestamp());
+ out.writeInt(fakeAppId.getId());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ taskSpec = new TaskSpec();
+ taskSpec.readFields(in);
+ long appIdTs = in.readLong();
+ int appIdId = in.readInt();
+ fakeAppId = ApplicationId.newInstance(appIdTs, appIdId);
+ }
+
+ public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ submitWorkInfo.write(dob);
+ return dob.getData();
+ }
+
+ public SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException {
+ DataInputBuffer dib = new DataInputBuffer();
+ dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length);
+ SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo();
+ submitWorkInfo.readFields(dib);
+ return submitWorkInfo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index b0cda82..011e459 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -44,6 +47,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.split.SplitLocationProvider;
import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -82,10 +86,30 @@ public class HiveSplitGenerator extends InputInitializer {
private final SplitGrouper splitGrouper = new SplitGrouper();
private SplitLocationProvider splitLocationProvider = null;
- public void initializeSplitGenerator(Configuration conf, MapWork work) {
+
+ // TODO RSHACK This entire method needs to be reworked. Put back final fields, separate into reusable components etc.
+ public void initializeSplitGenerator(Configuration conf, MapWork work) throws IOException {
+
this.conf = conf;
this.work = work;
- this.jobConf = new JobConf(conf);
+
+ // TODO RSHACK - assuming grouping enabled always.
+ userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build();
+
+ this.splitLocationProvider = Utils.getSplitLocationProvider(conf, LOG);
+ LOG.info("SplitLocationProvider: " + splitLocationProvider);
+
+ // Read all credentials into the credentials instance stored in JobConf.
+ ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
+
+ this.work = Utilities.getMapWork(jobConf);
+
+ // Events can start coming in the moment the InputInitializer is created. The pruner
+ // must be setup and initialized here so that it sets up it's structures to start accepting events.
+ // Setting it up in initialize leads to a window where events may come in before the pruner is
+ // initialized, which may cause it to drop events.
+ // TODO RSHACK - No dynamic partition pruning
+ pruner = null;
}
public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException,
@@ -129,7 +153,9 @@ public class HiveSplitGenerator extends InputInitializer {
conf.getBoolean("mapreduce.tez.input.initializer.serialize.event.payload", true);
// perform dynamic partition pruning
- pruner.prune();
+ if (pruner != null) {
+ pruner.prune();
+ }
InputSplitInfoMem inputSplitInfo = null;
boolean generateConsistentSplits = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS);
@@ -142,9 +168,20 @@ public class HiveSplitGenerator extends InputInitializer {
(InputFormat<?, ?>) ReflectionUtils.newInstance(JavaUtils.loadClass(realInputFormatName),
jobConf);
- int totalResource = getContext().getTotalAvailableResource().getMemory();
- int taskResource = getContext().getVertexTaskResource().getMemory();
- int availableSlots = totalResource / taskResource;
+ int totalResource = 0;
+ int taskResource = 0;
+ int availableSlots = 0;
+ // FIXME. Do the right thing Luke.
+ if (getContext() == null) {
+ // for now, totalResource = taskResource for llap
+ availableSlots = 1;
+ }
+
+ if (getContext() != null) {
+ totalResource = getContext().getTotalAvailableResource().getMemory();
+ taskResource = getContext().getVertexTaskResource().getMemory();
+ availableSlots = totalResource / taskResource;
+ }
if (HiveConf.getLongVar(conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, 1) <= 1) {
// broken configuration from mapred-default.xml
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 0584ad8..3fe70ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -94,6 +95,7 @@ public class MapRecordProcessor extends RecordProcessor {
super(jconf, context);
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
if (LlapProxy.isDaemon()) { // do not cache plan
+ jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, queryId + "_" + context.getTaskIndex());
cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
} else {
cache = ObjectCacheFactory.getCache(jconf, queryId);
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
index 598520c..0997233 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
@@ -1315,12 +1315,15 @@ public class TypeCheckProcFactory {
try {
return getXpathOrFuncExprNodeDesc(expr, isFunction, children, ctx);
} catch (UDFArgumentTypeException e) {
+ LOG.error("UDFArgumentTypeException: ", e);
throw new SemanticException(ErrorMsg.INVALID_ARGUMENT_TYPE.getMsg(expr
.getChild(childrenBegin + e.getArgumentId()), e.getMessage()));
} catch (UDFArgumentLengthException e) {
+ LOG.error("UDFArgumentLengthException: ", e);
throw new SemanticException(ErrorMsg.INVALID_ARGUMENT_LENGTH.getMsg(
expr, e.getMessage()));
} catch (UDFArgumentException e) {
+ LOG.error("UDFArgumentException: ", e);
throw new SemanticException(ErrorMsg.INVALID_ARGUMENT.getMsg(expr, e
.getMessage()));
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
index 3b7dcd9..9c7e1f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
@@ -18,6 +18,20 @@
package org.apache.hadoop.hive.ql.udf.generic;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
+import org.apache.hadoop.hive.llap.SubmitWorkInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+
+import javax.security.auth.login.LoginException;
+
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
@@ -28,6 +42,17 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.DataOutput;
+import com.esotericsoftware.kryo.Kryo;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.io.InputStream;
+
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TaskSpecBuilder;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +94,55 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.metastore.api.Schema;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.io.FileNotFoundException;
+import java.util.UUID;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.runtime.api.Event;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
+import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.InputSpecUpdate;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.InputDataInformationEvent;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
/**
* GenericUDFGetSplits.
@@ -177,7 +251,6 @@ public class GenericUDFGetSplits extends GenericUDF {
}
Path data = null;
- InputFormat inp = null;
String ifc = null;
TezWork tezWork = ((TezTask)roots.get(0)).getWork();
@@ -214,33 +287,13 @@ public class GenericUDFGetSplits extends GenericUDF {
}
tezWork = ((TezTask)roots.get(0)).getWork();
-
- // Table table = db.getTable(tableName);
- // if (table.isPartitioned()) {
- // throw new UDFArgumentException("Table " + tableName + " is partitioned.");
- // }
- // data = table.getDataLocation();
- // LOG.info("looking at: "+data);
-
- // ifc = table.getInputFormatClass().toString();
-
- // inp = ReflectionUtils.newInstance(table.getInputFormatClass(), jc);
}
MapWork w = (MapWork)tezWork.getAllWork().get(0);
- inp = new LlapInputFormat(tezWork, schema);
ifc = LlapInputFormat.class.toString();
try {
- if (inp instanceof JobConfigurable) {
- ((JobConfigurable) inp).configure(jc);
- }
-
- if (inp instanceof FileInputFormat) {
- ((FileInputFormat) inp).addInputPath(jc, data);
- }
-
- for (InputSplit s: inp.getSplits(jc, num)) {
+ for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
Object[] os = new Object[3];
os[0] = ifc;
os[1] = s.getClass().toString();
@@ -257,6 +310,133 @@ public class GenericUDFGetSplits extends GenericUDF {
return retArray;
}
+ public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) throws IOException {
+ DAG dag = DAG.create(work.getName());
+ dag.setCredentials(job.getCredentials());
+ // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag);
+
+ DagUtils utils = DagUtils.getInstance();
+ Context ctx = new Context(job);
+ MapWork mapWork = (MapWork) work.getAllWork().get(0);
+ // bunch of things get setup in the context based on conf but we need only the MR tmp directory
+ // for the following method.
+ JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
+ Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
+ FileSystem fs = scratchDir.getFileSystem(job);
+ try {
+ LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
+ Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
+ new ArrayList<LocalResource>(), fs, ctx, false, work,
+ work.getVertexType(mapWork));
+ String vertexName = wx.getName();
+ dag.addVertex(wx);
+ utils.addCredentials(mapWork, dag);
+
+ // we have the dag now proceed to get the splits:
+ HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
+ Preconditions.checkState(HiveConf.getBoolVar(wxConf,
+ HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
+ Preconditions.checkState(HiveConf.getBoolVar(wxConf,
+ HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
+ splitGenerator.initializeSplitGenerator(wxConf, mapWork);
+ List<Event> eventList = splitGenerator.initialize();
+
+ // hack - just serializing with kryo for now. This needs to be done properly
+ InputSplit[] result = new InputSplit[eventList.size()];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(10240);
+
+ InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0);
+
+ List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints();
+
+ Preconditions.checkState(hints.size() == eventList.size() -1);
+
+ LOG.info("DBG: Number of splits: " + (eventList.size() - 1));
+ for (int i = 1 ; i < eventList.size() ; i++) {
+ // Creating the TezEvent here itself, since it's easy to serialize.
+ Event event = eventList.get(i);
+ TaskLocationHint hint = hints.get(i-1);
+ Set<String> hosts = hint.getHosts();
+ LOG.info("DBG: Using locations: " + hosts.toString());
+ if (hosts.size() != 1) {
+ LOG.warn("DBG: Bad # of locations: " + hosts.size());
+ }
+ SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()];
+
+ int j = 0;
+ for (String host : hosts) {
+ locations[j++] = new SplitLocationInfo(host, false);
+ }
+
+ bos.reset();
+ Kryo kryo = SerializationUtilities.borrowKryo();
+ SerializationUtilities.serializeObjectByKryo(kryo, event, bos);
+ SerializationUtilities.releaseKryo(kryo);
+
+ TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1);
+ ApplicationId fakeApplicationId = ApplicationId.newInstance(new Random().nextInt(), 0);
+ SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(taskSpec, fakeApplicationId);
+ byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
+
+ result[i-1] = new LlapInputSplit(submitWorkBytes, bos.toByteArray(), locations, schema);
+ }
+ return result;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Returns a local resource representing a jar. This resource will be used to execute the plan on
+ * the cluster.
+ *
+ * @param localJarPath
+ * Local path to the jar to be localized.
+ * @return LocalResource corresponding to the localized hive exec resource.
+ * @throws IOException
+ * when any file system related call fails.
+ * @throws LoginException
+ * when we are unable to determine the user.
+ * @throws URISyntaxException
+ * when current jar location cannot be determined.
+ */
+ private LocalResource createJarLocalResource(String localJarPath, DagUtils utils,
+ Configuration conf)
+ throws IOException, LoginException, IllegalArgumentException, FileNotFoundException {
+ FileStatus destDirStatus = utils.getHiveJarDirectory(conf);
+ assert destDirStatus != null;
+ Path destDirPath = destDirStatus.getPath();
+
+ Path localFile = new Path(localJarPath);
+ String sha = getSha(localFile, conf);
+
+ String destFileName = localFile.getName();
+
+ // Now, try to find the file based on SHA and name. Currently we require exact name match.
+ // We could also allow cutting off versions and other stuff provided that SHA matches...
+ destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
+ + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
+
+ // TODO: if this method is ever called on more than one jar, getting the dir and the
+ // list need to be refactored out to be done only once.
+ Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
+ return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
+ }
+
+ private String getSha(Path localFile, Configuration conf)
+ throws IOException, IllegalArgumentException {
+ InputStream is = null;
+ try {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ is = localFs.open(localFile);
+ return DigestUtils.sha256Hex(is);
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ }
+
@Override
public String getDisplayString(String[] children) {
assert children.length == 2;
http://git-wip-us.apache.org/repos/asf/hive/blob/f272acea/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
new file mode 100644
index 0000000..d0c7c5a
--- /dev/null
+++ b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
@@ -0,0 +1,45 @@
+package org.apache.tez.dag.api;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+
+// Proxy class within the tez.api package to access package private methods.
+public class TaskSpecBuilder {
+
+ public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits) {
+ Vertex vertex = dag.getVertex(vertexName);
+ ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
+ List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs =
+ vertex.getInputs();
+ List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> outputs =
+ vertex.getOutputs();
+
+ // TODO RSHACK - for now these must be of size 1.
+ Preconditions.checkState(inputs.size() == 1);
+ Preconditions.checkState(outputs.size() == 1);
+
+ List<InputSpec> inputSpecs = new ArrayList<>();
+ for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
+ InputSpec inputSpec = new InputSpec(input.getName(), input.getIODescriptor(), 1);
+ inputSpecs.add(inputSpec);
+ }
+
+ List<OutputSpec> outputSpecs = new ArrayList<>();
+ for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output : outputs) {
+ OutputSpec outputSpec = new OutputSpec(output.getName(), output.getIODescriptor(), 1);
+ outputSpecs.add(outputSpec);
+ }
+
+ TaskSpec taskSpec = TaskSpec
+ .createBaseTaskSpec(dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs,
+ outputSpecs, null);
+
+ return taskSpec;
+ }
+
+}