You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:24:53 UTC
[03/39] hive git commit: HIVE-13140. Wire the client to submit
execution fragments. (Gunther Hagleitner, Siddharth Seth and Vikram Dixit K)
HIVE-13140. Wire the client to submit execution fragments. (Gunther
Hagleitner, Siddharth Seth and Vikram Dixit K)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2e042cc1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2e042cc1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2e042cc1
Branch: refs/heads/master
Commit: 2e042cc159c0e7e044297fc4d6b177a8841eb7fd
Parents: f272ace
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Feb 23 23:56:38 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Feb 23 23:56:38 2016 -0800
----------------------------------------------------------------------
.../org/apache/hive/jdbc/LlapInputFormat.java | 7 +-
.../hadoop/hive/llap/LlapInputFormat.java | 198 ++++++++++++
.../ext/LlapTaskUmbilicalExternalClient.java | 23 +-
.../apache/hadoop/hive/llap/LlapInputSplit.java | 77 ++---
.../apache/hadoop/hive/llap/SubmitWorkInfo.java | 42 ++-
.../ql/udf/generic/GenericUDFGetSplits.java | 317 +++++++++----------
.../org/apache/tez/dag/api/TaskSpecBuilder.java | 27 +-
7 files changed, 467 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
index c38dd82..e662414 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
@@ -130,8 +130,9 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
@Override
public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
- LlapInputSplit llapSplit = (LlapInputSplit)split;
- return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
+ try {
+ return ((InputFormat)Class.forName("org.apache.hadoop.hive.llap.LlapInputFormat").newInstance()).getRecordReader(split, job, reporter);
+ } catch (Exception e) { throw new IOException(e); }
}
@Override
@@ -160,7 +161,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
ResultSet res = stmt.executeQuery(sql);
while (res.next()) {
// deserialize split
- DataInput in = new DataInputStream(new ByteArrayInputStream(res.getBytes(3)));
+ DataInput in = new DataInputStream(res.getBinaryStream(3));
InputSplit is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance(); // todo setAccessible on ctor
is.readFields(in);
ins.add(is);
http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
new file mode 100644
index 0000000..cf13c1e
--- /dev/null
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
+import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
+
+
+ public LlapInputFormat() {
+ }
+
+ /*
+ * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
+ * off the work in the split to LLAP and finally return the connected socket back in an
+ * LlapRecordReader. The LlapRecordReader class reads the results from the socket.
+ */
+ @Override
+ public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+
+ LlapInputSplit llapSplit = (LlapInputSplit) split;
+ SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
+
+ int llapSubmitPort = HiveConf.getIntVar(job, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
+
+ LOG.info("ZZZ: DBG: Starting LlapTaskUmbilicalExternalClient");
+
+ LlapTaskUmbilicalExternalClient llapClient =
+ new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
+ submitWorkInfo.getToken());
+ llapClient.init(job);
+ llapClient.start();
+
+ LOG.info("ZZZ: DBG: Crated LlapClient");
+ // TODO KKK Shutdown the llap client.
+
+ SubmitWorkRequestProto submitWorkRequestProto =
+ constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
+ llapClient.getAddress(), submitWorkInfo.getToken());
+
+ LOG.info("ZZZ: DBG: Created submitWorkRequest for: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
+
+ TezEvent tezEvent = new TezEvent();
+ DataInputBuffer dib = new DataInputBuffer();
+ dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length);
+ tezEvent.readFields(dib);
+ List<TezEvent> tezEventList = Lists.newArrayList();
+ tezEventList.add(tezEvent);
+
+ // this is just the portion that sets up the io to receive data
+ String host = split.getLocations()[0];
+
+ llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
+
+ String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
+
+ HiveConf conf = new HiveConf();
+ Socket socket = new Socket(host,
+ conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
+
+ LOG.debug("Socket connected");
+
+ socket.getOutputStream().write(id.getBytes());
+ socket.getOutputStream().write(0);
+ socket.getOutputStream().flush();
+
+ LOG.debug("Registered id: " + id);
+
+ return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+ }
+
+ @Override
+ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+ throw new IOException("These are not the splits you are looking for.");
+ }
+
+ private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
+ int taskNum,
+ InetSocketAddress address,
+ Token<JobTokenIdentifier> token) throws
+ IOException {
+ TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
+ ApplicationId appId = submitWorkInfo.getFakeAppId();
+
+ SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
+ // This works, assuming the executor is running within YARN.
+ LOG.info("DBG: Setting user in submitWorkRequest to: " +
+ System.getenv(ApplicationConstants.Environment.USER.name()));
+ builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+ builder.setApplicationIdString(appId.toString());
+ builder.setAppAttemptNumber(0);
+ builder.setTokenIdentifier(appId.toString());
+
+ ContainerId containerId =
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
+ builder.setContainerIdString(containerId.toString());
+
+
+ builder.setAmHost(address.getHostName());
+ builder.setAmPort(address.getPort());
+ Credentials taskCredentials = new Credentials();
+ // Credentials can change across DAGs. Ideally construct only once per DAG.
+ // TODO Figure out where credentials will come from. Normally Hive sets up
+ // URLs on the tez dag, for which Tez acquires credentials.
+
+// taskCredentials.addAll(getContext().getCredentials());
+
+// Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+// taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+// ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
+// if (credentialsBinary == null) {
+// credentialsBinary = serializeCredentials(getContext().getCredentials());
+// credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
+// } else {
+// credentialsBinary = credentialsBinary.duplicate();
+// }
+// builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+ Credentials credentials = new Credentials();
+ TokenCache.setSessionToken(token, credentials);
+ ByteBuffer credentialsBinary = serializeCredentials(credentials);
+ builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+
+
+ builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
+
+ FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder();
+ runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
+ runtimeInfo.setWithinDagPriority(0);
+ runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
+ runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
+ runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
+ runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
+
+
+ builder.setUsingTezAm(false);
+ builder.setFragmentRuntimeInfo(runtimeInfo.build());
+ return builder.build();
+ }
+
+ private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
+ Credentials containerCredentials = new Credentials();
+ containerCredentials.addAll(credentials);
+ DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+ containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+ return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index ecc032d..4305682 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -1,6 +1,8 @@
package org.apache.hadoop.hive.llap.ext;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -55,6 +57,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
this.sessionToken = sessionToken;
// TODO. No support for the LLAP token yet. Add support for configurable threads, however 1 should always be enough.
this.communicator = new LlapProtocolClientProxy(1, conf, null);
+ this.communicator.init(conf);
}
@Override
@@ -62,6 +65,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
int numHandlers = HiveConf.getIntVar(conf,
HiveConf.ConfVars.LLAP_TMP_EXT_CLIENT_NUM_SERVER_HANDLERS);
llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, umbilical, numHandlers, tokenIdentifier, sessionToken);
+ communicator.start();
}
@Override
@@ -72,24 +76,31 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
}
}
+ public InetSocketAddress getAddress() {
+ return llapTaskUmbilicalServer.getAddress();
+ }
+
/**
* Submit the work for actual execution. This should always have the usingTezAm flag disabled
* @param submitWorkRequestProto
*/
- public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort) {
+ public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
- // Store the actual event first. To be returned on the first heartbeat.
- Event mrInputEvent = null;
- // Construct a TezEvent out of this, to send it out on the next heaertbeat
+ LOG.warn("ZZZ: DBG: " + " Submitting fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " on host: " + llapHost + ", port=" + llapPort);
+// LOG.info("ZZZ: DBG: " + " Complete SubmitWorkRequest: " + submitWorkRequestProto);
// submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()
+ LOG.info("ZZZ: DBG: Received {} events for {}", tezEvents.size(), submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
+ // Register the pending events to be sent for this spec.
+ pendingEvents.putIfAbsent(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), tezEvents);
// Send out the actual SubmitWorkRequest
communicator.sendSubmitWork(submitWorkRequestProto, llapHost, llapPort,
new LlapProtocolClientProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto>() {
+
@Override
public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
if (response.hasSubmissionState()) {
@@ -110,6 +121,7 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
+
// // TODO Also send out information saying that the fragment is finishable - if that is not already included in the main fragment.
// // This entire call is only required if we're doing more than scans. MRInput has no dependencies and is always finishable
// QueryIdentifierProto queryIdentifier = QueryIdentifierProto
@@ -157,6 +169,9 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
LOG.info("ZZZ: DBG: Received heartbeat from taskAttemptId: " + taskAttemptId.toString());
List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString());
+ if (tezEvents == null) {
+ tezEvents = Collections.emptyList();
+ }
response.setLastRequestId(request.getRequestId());
// Irrelevant from eventIds. This can be tracked in the AM itself, instead of polluting the task.
http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
index 4249a16..d26a579 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputSplit.java
@@ -23,44 +23,26 @@ import java.io.IOException;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.AutoExpandingBuffer;
-import org.apache.thrift.transport.AutoExpandingBufferWriteTransport;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TSerializer;
public class LlapInputSplit implements InputSplitWithLocationInfo {
+ int splitNum;
byte[] planBytes;
byte[] fragmentBytes;
SplitLocationInfo[] locations;
Schema schema;
-
- // // Static
- // ContainerIdString
- // DagName
- // VertexName
- // FragmentNumber
- // AttemptNumber - always 0
- // FragmentIdentifierString - taskAttemptId
-
- // ProcessorDescsriptor
- // InputSpec
- // OutputSpec
-
- // Tokens
-
- // // Dynamic
- //
-
public LlapInputSplit() {
}
- public LlapInputSplit(byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema) {
+ public LlapInputSplit(int splitNum, byte[] planBytes, byte[] fragmentBytes, SplitLocationInfo[] locations, Schema schema) {
this.planBytes = planBytes;
this.fragmentBytes = fragmentBytes;
this.locations = locations;
this.schema = schema;
+ this.splitNum = splitNum;
}
public Schema getSchema() {
@@ -81,8 +63,23 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
return locs;
}
+ public int getSplitNum() {
+ return splitNum;
+ }
+
+ public byte[] getPlanBytes() {
+ return planBytes;
+ }
+
+ public byte[] getFragmentBytes() {
+ return fragmentBytes;
+ }
+
+
+
@Override
public void write(DataOutput out) throws IOException {
+ out.writeInt(splitNum);
out.writeInt(planBytes.length);
out.write(planBytes);
@@ -97,20 +94,24 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
byte[] binarySchema;
try {
- AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(1024, 2d);
- TProtocol protocol = new TBinaryProtocol(transport);
- schema.write(protocol);
- binarySchema = transport.getBuf().array();
+ TSerializer serializer = new TSerializer();
+ byte[] serialzied = serializer.serialize(schema);
+ out.writeInt(serialzied.length);
+ out.write(serialzied);
+// AutoExpandingBufferWriteTransport transport = new AutoExpandingBufferWriteTransport(1024, 2d);
+// TProtocol protocol = new TBinaryProtocol(transport);
+// schema.write(protocol);
+// binarySchema = transport.getBuf().array();
} catch (Exception e) {
throw new IOException(e);
}
- out.writeInt(binarySchema.length);
- out.write(binarySchema);
+
}
@Override
public void readFields(DataInput in) throws IOException {
+ splitNum = in.readInt();
int length = in.readInt();
planBytes = new byte[length];
in.readFully(planBytes);
@@ -129,14 +130,18 @@ public class LlapInputSplit implements InputSplitWithLocationInfo {
length = in.readInt();
try {
- AutoExpandingBufferWriteTransport transport =
- new AutoExpandingBufferWriteTransport(length, 2d);
- AutoExpandingBuffer buf = transport.getBuf();
- in.readFully(buf.array(), 0, length);
-
- TProtocol protocol = new TBinaryProtocol(transport);
+ byte[] schemaBytes = new byte[length];
+ in.readFully(schemaBytes);
+ TDeserializer tDeserializer = new TDeserializer();
schema = new Schema();
- schema.read(protocol);
+ tDeserializer.deserialize(schema, schemaBytes);
+// AutoExpandingBufferReadTransport transport = new AutoExpandingBufferReadTransport(length, 2d);
+// AutoExpandingBuffer buf = transport.getBuf();
+// in.readFully(buf.array(), 0, length);
+//
+// TProtocol protocol = new TBinaryProtocol(transport);
+// schema = new Schema();
+// schema.read(protocol);
} catch (Exception e) {
throw new IOException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
index a9a3738..83149ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/SubmitWorkInfo.java
@@ -6,18 +6,29 @@ import java.io.IOException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.impl.TaskSpec;
public class SubmitWorkInfo implements Writable {
private TaskSpec taskSpec;
private ApplicationId fakeAppId;
+ private long creationTime;
- public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId) {
+ // This is used to communicate over the LlapUmbilicalProtocol. Not related to tokens used to
+ // talk to LLAP daemons itself via the securit work.
+ private Token<JobTokenIdentifier> token;
+
+ public SubmitWorkInfo(TaskSpec taskSpec, ApplicationId fakeAppId, long creationTime) {
this.taskSpec = taskSpec;
this.fakeAppId = fakeAppId;
+ this.token = createJobToken();
+ this.creationTime = creationTime;
}
// Empty constructor for writable etc.
@@ -32,11 +43,25 @@ public class SubmitWorkInfo implements Writable {
return fakeAppId;
}
+ public String getTokenIdentifier() {
+ return fakeAppId.toString();
+ }
+
+ public Token<JobTokenIdentifier> getToken() {
+ return token;
+ }
+
+ public long getCreationTime() {
+ return creationTime;
+ }
+
@Override
public void write(DataOutput out) throws IOException {
taskSpec.write(out);
out.writeLong(fakeAppId.getClusterTimestamp());
out.writeInt(fakeAppId.getId());
+ token.write(out);
+ out.writeLong(creationTime);
}
@Override
@@ -46,6 +71,9 @@ public class SubmitWorkInfo implements Writable {
long appIdTs = in.readLong();
int appIdId = in.readInt();
fakeAppId = ApplicationId.newInstance(appIdTs, appIdId);
+ token = new Token<>();
+ token.readFields(in);
+ creationTime = in.readLong();
}
public static byte[] toBytes(SubmitWorkInfo submitWorkInfo) throws IOException {
@@ -54,7 +82,7 @@ public class SubmitWorkInfo implements Writable {
return dob.getData();
}
- public SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException {
+ public static SubmitWorkInfo fromBytes(byte[] submitWorkInfoBytes) throws IOException {
DataInputBuffer dib = new DataInputBuffer();
dib.reset(submitWorkInfoBytes, 0, submitWorkInfoBytes.length);
SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo();
@@ -62,4 +90,14 @@ public class SubmitWorkInfo implements Writable {
return submitWorkInfo;
}
+
+ private Token<JobTokenIdentifier> createJobToken() {
+ String tokenIdentifier = fakeAppId.toString();
+ JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+ tokenIdentifier));
+ Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
+ new JobTokenSecretManager());
+ sessionToken.setService(identifier.getJobId());
+ return sessionToken;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
index 9c7e1f2..9fa4aa8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
@@ -18,131 +18,79 @@
package org.apache.hadoop.hive.ql.udf.generic;
-import org.apache.hadoop.hive.llap.LlapInputSplit;
-import org.apache.hadoop.hive.llap.SubmitWorkInfo;
-
+import javax.security.auth.login.LoginException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Set;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-
-import javax.security.auth.login.LoginException;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.ArrayList;
import java.util.UUID;
-import java.io.Serializable;
-import java.io.IOException;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.DataOutput;
import com.esotericsoftware.kryo.Kryo;
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
-import java.io.InputStream;
-
-import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TaskSpecBuilder;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.hive.ql.udf.UDFType;
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
+import org.apache.hadoop.hive.llap.SubmitWorkInfo;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.MapredContext;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.llap.LlapInputFormat;
-import org.apache.hadoop.hive.llap.LlapOutputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.metastore.api.Schema;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.io.FileNotFoundException;
-import java.util.UUID;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.runtime.api.Event;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
-import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
+import org.apache.tez.dag.api.TaskSpecBuilder;
+import org.apache.tez.dag.api.Vertex;
import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputInitializer;
-import org.apache.tez.runtime.api.InputInitializerContext;
-import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* GenericUDFGetSplits.
@@ -155,6 +103,8 @@ public class GenericUDFGetSplits extends GenericUDF {
private static final Logger LOG = LoggerFactory.getLogger(GenericUDFGetSplits.class);
+ private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat";
+
private transient StringObjectInspector stringOI;
private transient IntObjectInspector intOI;
private final ArrayList<Object> retArray = new ArrayList<Object>();
@@ -190,13 +140,13 @@ public class GenericUDFGetSplits extends GenericUDF {
} else if (!(arguments[0] instanceof StringObjectInspector)) {
LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
throw new UDFArgumentTypeException(0, "\""
- + "string\" is expected at function GET_SPLITS, " + "but \""
- + arguments[0].getTypeName() + "\" is found");
+ + "string\" is expected at function GET_SPLITS, " + "but \""
+ + arguments[0].getTypeName() + "\" is found");
} else if (!(arguments[1] instanceof IntObjectInspector)) {
LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
throw new UDFArgumentTypeException(1, "\""
- + "int\" is expected at function GET_SPLITS, " + "but \""
- + arguments[1].getTypeName() + "\" is found");
+ + "int\" is expected at function GET_SPLITS, " + "but \""
+ + arguments[1].getTypeName() + "\" is found");
}
stringOI = (StringObjectInspector) arguments[0];
@@ -204,9 +154,9 @@ public class GenericUDFGetSplits extends GenericUDF {
List<String> names = Arrays.asList("if_class","split_class","split");
List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
- PrimitiveObjectInspectorFactory.javaStringObjectInspector,
- PrimitiveObjectInspectorFactory.javaStringObjectInspector,
- PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
ObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(outputOI);
bos = new ByteArrayOutputStream(1024);
@@ -233,80 +183,85 @@ public class GenericUDFGetSplits extends GenericUDF {
throw new HiveException("Need configuration");
}
- LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.toString()+"\"");
- HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none");
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, LlapOutputFormat.class.toString());
+ String fetchTaskConversion = HiveConf.getVar(conf, ConfVars.HIVEFETCHTASKCONVERSION);
+ String queryResultFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
- cpr = driver.compileAndRespond(query);
- if(cpr.getResponseCode() != 0) {
- throw new HiveException("Failed to compile query: "+cpr.getException());
- }
+ try {
+ LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.getName()+"\"");
+ HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none");
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, LlapOutputFormat.class.getName());
- QueryPlan plan = driver.getPlan();
- List<Task<?>> roots = plan.getRootTasks();
- Schema schema = plan.getResultSchema();
+ cpr = driver.compileAndRespond(query);
+ if(cpr.getResponseCode() != 0) {
+ throw new HiveException("Failed to compile query: "+cpr.getException());
+ }
- if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
- throw new HiveException("Was expecting a single TezTask.");
- }
+ QueryPlan plan = driver.getPlan();
+ List<Task<?>> roots = plan.getRootTasks();
+ Schema schema = plan.getResultSchema();
- Path data = null;
- String ifc = null;
+ if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
+ throw new HiveException("Was expecting a single TezTask.");
+ }
- TezWork tezWork = ((TezTask)roots.get(0)).getWork();
+ Path data = null;
- if (tezWork.getAllWork().size() != 1) {
+ TezWork tezWork = ((TezTask)roots.get(0)).getWork();
- String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", "");
+ if (tezWork.getAllWork().size() != 1) {
- String ctas = "create temporary table "+tableName+" as "+query;
- LOG.info("CTAS: "+ctas);
+ String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", "");
- try {
- cpr = driver.run(ctas, false);
- } catch(CommandNeedRetryException e) {
- throw new HiveException(e);
- }
+ String ctas = "create temporary table "+tableName+" as "+query;
+ LOG.info("CTAS: "+ctas);
- if(cpr.getResponseCode() != 0) {
- throw new HiveException("Failed to create temp table: " + cpr.getException());
- }
+ try {
+ cpr = driver.run(ctas, false);
+ } catch(CommandNeedRetryException e) {
+ throw new HiveException(e);
+ }
- query = "select * from " + tableName;
- cpr = driver.compileAndRespond(query);
- if(cpr.getResponseCode() != 0) {
- throw new HiveException("Failed to create temp table: "+cpr.getException());
- }
+ if(cpr.getResponseCode() != 0) {
+ throw new HiveException("Failed to create temp table: " + cpr.getException());
+ }
- plan = driver.getPlan();
- roots = plan.getRootTasks();
- schema = plan.getResultSchema();
+ query = "select * from " + tableName;
+ cpr = driver.compileAndRespond(query);
+ if(cpr.getResponseCode() != 0) {
+ throw new HiveException("Failed to create temp table: "+cpr.getException());
+ }
- if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
- throw new HiveException("Was expecting a single TezTask.");
- }
+ plan = driver.getPlan();
+ roots = plan.getRootTasks();
+ schema = plan.getResultSchema();
- tezWork = ((TezTask)roots.get(0)).getWork();
- }
+ if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
+ throw new HiveException("Was expecting a single TezTask.");
+ }
- MapWork w = (MapWork)tezWork.getAllWork().get(0);
- ifc = LlapInputFormat.class.toString();
+ tezWork = ((TezTask)roots.get(0)).getWork();
+ }
- try {
- for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
- Object[] os = new Object[3];
- os[0] = ifc;
- os[1] = s.getClass().toString();
- bos.reset();
- s.write(dos);
- byte[] frozen = bos.toByteArray();
- os[2] = frozen;
- retArray.add(os);
+ MapWork w = (MapWork)tezWork.getAllWork().get(0);
+
+ try {
+ for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
+ Object[] os = new Object[3];
+ os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME;
+ os[1] = s.getClass().getName();
+ bos.reset();
+ s.write(dos);
+ byte[] frozen = bos.toByteArray();
+ os[2] = frozen;
+ retArray.add(os);
+ }
+ } catch(Exception e) {
+ throw new HiveException(e);
}
- } catch(Exception e) {
- throw new HiveException(e);
+ } finally {
+ HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, fetchTaskConversion);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, queryResultFormat);
}
-
return retArray;
}
@@ -332,6 +287,7 @@ public class GenericUDFGetSplits extends GenericUDF {
dag.addVertex(wx);
utils.addCredentials(mapWork, dag);
+
// we have the dag now proceed to get the splits:
HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
Preconditions.checkState(HiveConf.getBoolVar(wxConf,
@@ -342,8 +298,8 @@ public class GenericUDFGetSplits extends GenericUDF {
List<Event> eventList = splitGenerator.initialize();
// hack - just serializing with kryo for now. This needs to be done properly
- InputSplit[] result = new InputSplit[eventList.size()];
- ByteArrayOutputStream bos = new ByteArrayOutputStream(10240);
+ InputSplit[] result = new InputSplit[eventList.size() - 1];
+ DataOutputBuffer dob = new DataOutputBuffer();
InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0);
@@ -351,11 +307,25 @@ public class GenericUDFGetSplits extends GenericUDF {
Preconditions.checkState(hints.size() == eventList.size() -1);
+ LOG.error("DBG: NumEvents=" + eventList.size());
+ LOG.error("DBG: NumSplits=" + result.length);
+
+ ApplicationId fakeApplicationId = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0);
+ TaskSpec taskSpec =
+ new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1, fakeApplicationId);
+
+ SubmitWorkInfo submitWorkInfo =
+ new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis());
+ EventMetaData sourceMetaData =
+ new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName,
+ "NULL_VERTEX", null);
+ EventMetaData destinationMetaInfo = new TaskSpecBuilder().getDestingationMetaData(wx);
+
LOG.info("DBG: Number of splits: " + (eventList.size() - 1));
- for (int i = 1 ; i < eventList.size() ; i++) {
+ for (int i = 0; i < eventList.size() - 1; i++) {
// Creating the TezEvent here itself, since it's easy to serialize.
- Event event = eventList.get(i);
- TaskLocationHint hint = hints.get(i-1);
+ Event event = eventList.get(i + 1);
+ TaskLocationHint hint = hints.get(i);
Set<String> hosts = hint.getHosts();
LOG.info("DBG: Using locations: " + hosts.toString());
if (hosts.size() != 1) {
@@ -367,18 +337,17 @@ public class GenericUDFGetSplits extends GenericUDF {
for (String host : hosts) {
locations[j++] = new SplitLocationInfo(host, false);
}
+ TezEvent tezEvent = new TezEvent(event, sourceMetaData, System.currentTimeMillis());
+ tezEvent.setDestinationInfo(destinationMetaInfo);
bos.reset();
- Kryo kryo = SerializationUtilities.borrowKryo();
- SerializationUtilities.serializeObjectByKryo(kryo, event, bos);
- SerializationUtilities.releaseKryo(kryo);
+ dob.reset();
+ tezEvent.write(dob);
- TaskSpec taskSpec = new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1);
- ApplicationId fakeApplicationId = ApplicationId.newInstance(new Random().nextInt(), 0);
- SubmitWorkInfo submitWorkInfo = new SubmitWorkInfo(taskSpec, fakeApplicationId);
byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
- result[i-1] = new LlapInputSplit(submitWorkBytes, bos.toByteArray(), locations, schema);
+ result[i] =
+ new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema);
}
return result;
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2e042cc1/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
index d0c7c5a..5cabb6a 100644
--- a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
+++ b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
@@ -4,6 +4,12 @@ import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
@@ -11,7 +17,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
// Proxy class within the tez.api package to access package private methods.
public class TaskSpecBuilder {
- public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits) {
+ public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits, ApplicationId appId) {
Vertex vertex = dag.getVertex(vertexName);
ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs =
@@ -35,11 +41,22 @@ public class TaskSpecBuilder {
outputSpecs.add(outputSpec);
}
- TaskSpec taskSpec = TaskSpec
- .createBaseTaskSpec(dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs,
- outputSpecs, null);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 0);
+ TezVertexID vertexId = TezVertexID.getInstance(dagId, 0);
+ TezTaskID taskId = TezTaskID.getInstance(vertexId, 0);
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
+ return new TaskSpec(taskAttemptId, dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs, outputSpecs, null);
+ }
- return taskSpec;
+ public EventMetaData getDestingationMetaData(Vertex vertex) {
+ List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs =
+ vertex.getInputs();
+ Preconditions.checkState(inputs.size() == 1);
+ String inputName = inputs.get(0).getName();
+ EventMetaData destMeta =
+ new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertex.getName(),
+ inputName, null);
+ return destMeta;
}
}