You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/04/20 00:10:15 UTC
hive git commit: HIVE-13550: Get rid of wrapped
LlapInputSplit/InputFormat classes
Repository: hive
Updated Branches:
refs/heads/llap 7b9096a92 -> 0afaa8f6d
HIVE-13550: Get rid of wrapped LlapInputSplit/InputFormat classes
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0afaa8f6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0afaa8f6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0afaa8f6
Branch: refs/heads/llap
Commit: 0afaa8f6dc60d51a01ba8085b1cb89624eafd3d0
Parents: 7b9096a
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Apr 19 15:09:33 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Apr 19 15:09:33 2016 -0700
----------------------------------------------------------------------
.../hive/llap/ext/TestLlapInputSplit.java | 26 +-
.../hadoop/hive/llap/LlapInputFormat.java | 392 ----
llap-ext-client/pom.xml | 33 +
.../hadoop/hive/llap/LlapBaseInputFormat.java | 351 ++-
.../hadoop/hive/llap/LlapRowInputFormat.java | 4 +-
.../apache/hive/llap/ext/LlapInputSplit.java | 73 -
.../hadoop/hive/ql/exec/FunctionRegistry.java | 1 -
.../udf/generic/GenericUDTFExecuteSplits.java | 124 -
.../ql/udf/generic/GenericUDTFGetSplits.java | 10 +-
.../queries/clientpositive/udtf_get_splits.q | 43 -
.../clientpositive/llap/udtf_get_splits.q.out | 2130 ------------------
.../clientpositive/tez/udf_get_splits.q.out | 73 -
12 files changed, 389 insertions(+), 2871 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
index 04da17e..8264190 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/llap/ext/TestLlapInputSplit.java
@@ -10,6 +10,7 @@ import java.util.HashMap;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
import org.apache.hadoop.hive.llap.Schema;
import org.apache.hadoop.hive.llap.FieldDesc;
import org.apache.hadoop.hive.llap.TypeDesc;
@@ -40,7 +41,7 @@ public class TestLlapInputSplit {
colDescs.add(new FieldDesc("col2", new TypeDesc(TypeDesc.Type.INT)));
Schema schema = new Schema(colDescs);
- org.apache.hadoop.hive.llap.LlapInputSplit split1 = new org.apache.hadoop.hive.llap.LlapInputSplit(
+ LlapInputSplit split1 = new LlapInputSplit(
splitNum,
planBytes,
fragmentBytes,
@@ -52,35 +53,18 @@ public class TestLlapInputSplit {
split1.write(dataOut);
ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
DataInputStream dataIn = new DataInputStream(byteInStream);
- org.apache.hadoop.hive.llap.LlapInputSplit split2 = new org.apache.hadoop.hive.llap.LlapInputSplit();
+ LlapInputSplit split2 = new LlapInputSplit();
split2.readFields(dataIn);
// Did we read all the data?
assertEquals(0, byteInStream.available());
checkLlapSplits(split1, split2);
-
- // Try JDBC LlapInputSplits
- org.apache.hive.llap.ext.LlapInputSplit<Text> jdbcSplit1 =
- new org.apache.hive.llap.ext.LlapInputSplit<Text>(split1, "org.apache.hadoop.hive.llap.LlapInputFormat");
- byteOutStream.reset();
- jdbcSplit1.write(dataOut);
- byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
- dataIn = new DataInputStream(byteInStream);
- org.apache.hive.llap.ext.LlapInputSplit<Text> jdbcSplit2 = new org.apache.hive.llap.ext.LlapInputSplit<Text>();
- jdbcSplit2.readFields(dataIn);
-
- assertEquals(0, byteInStream.available());
-
- checkLlapSplits(
- (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit1.getSplit(),
- (org.apache.hadoop.hive.llap.LlapInputSplit) jdbcSplit2.getSplit());
- assertEquals(jdbcSplit1.getInputFormat().getClass(), jdbcSplit2.getInputFormat().getClass());
}
static void checkLlapSplits(
- org.apache.hadoop.hive.llap.LlapInputSplit split1,
- org.apache.hadoop.hive.llap.LlapInputSplit split2) throws Exception {
+ LlapInputSplit split1,
+ LlapInputSplit split2) throws Exception {
assertEquals(split1.getSplitNum(), split2.getSplitNum());
assertArrayEquals(split1.getPlanBytes(), split2.getPlanBytes());
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
deleted file mode 100644
index 0930d60..0000000
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-
-import org.apache.commons.collections4.ListUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
-import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
-import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
-import org.apache.hadoop.hive.llap.registry.ServiceInstance;
-import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
-import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.llap.tez.Converters;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
-import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
-
-
-public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
-
- private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
-
- public LlapInputFormat() {
- }
-
- /*
- * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
- * off the work in the split to LLAP and finally return the connected socket back in an
- * LlapRecordReader. The LlapRecordReader class reads the results from the socket.
- */
- @Override
- public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
-
- LlapInputSplit llapSplit = (LlapInputSplit) split;
-
- // Set conf to use LLAP user rather than current user for LLAP Zk registry.
- HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser());
- SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
-
- ServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
- String host = serviceInstance.getHost();
- int llapSubmitPort = serviceInstance.getRpcPort();
-
- LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
- + " and outputformat port " + serviceInstance.getOutputFormatPort());
-
- LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
- new LlapRecordReaderTaskUmbilicalExternalResponder();
- LlapTaskUmbilicalExternalClient llapClient =
- new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
- submitWorkInfo.getToken(), umbilicalResponder);
- llapClient.init(job);
- llapClient.start();
-
- SubmitWorkRequestProto submitWorkRequestProto =
- constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
- llapClient.getAddress(), submitWorkInfo.getToken());
-
- TezEvent tezEvent = new TezEvent();
- DataInputBuffer dib = new DataInputBuffer();
- dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length);
- tezEvent.readFields(dib);
- List<TezEvent> tezEventList = Lists.newArrayList();
- tezEventList.add(tezEvent);
-
- llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
-
- String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
-
- HiveConf conf = new HiveConf();
- Socket socket = new Socket(host,
- serviceInstance.getOutputFormatPort());
-
- LOG.debug("Socket connected");
-
- socket.getOutputStream().write(id.getBytes());
- socket.getOutputStream().write(0);
- socket.getOutputStream().flush();
-
- LOG.info("Registered id: " + id);
-
- LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
- umbilicalResponder.setRecordReader(recordReader);
- return recordReader;
- }
-
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- throw new IOException("These are not the splits you are looking for.");
- }
-
- private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException {
- LlapRegistryService registryService = LlapRegistryService.getClient(job);
- String host = llapSplit.getLocations()[0];
-
- ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host);
- if (serviceInstance == null) {
- throw new IOException("No service instances found for " + host + " in registry");
- }
-
- return serviceInstance;
- }
-
- private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
- InetAddress address = InetAddress.getByName(host);
- ServiceInstanceSet instanceSet = registryService.getInstances();
- ServiceInstance serviceInstance = null;
-
- // The name used in the service registry may not match the host name we're using.
- // Try hostname/canonical hostname/host address
-
- String name = address.getHostName();
- LOG.info("Searching service instance by hostname " + name);
- serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
- if (serviceInstance != null) {
- return serviceInstance;
- }
-
- name = address.getCanonicalHostName();
- LOG.info("Searching service instance by canonical hostname " + name);
- serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
- if (serviceInstance != null) {
- return serviceInstance;
- }
-
- name = address.getHostAddress();
- LOG.info("Searching service instance by address " + name);
- serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
- if (serviceInstance != null) {
- return serviceInstance;
- }
-
- return serviceInstance;
- }
-
- private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) {
- if (serviceInstances == null || serviceInstances.isEmpty()) {
- return null;
- }
-
- // Get the first live service instance
- for (ServiceInstance serviceInstance : serviceInstances) {
- if (serviceInstance.isAlive()) {
- return serviceInstance;
- }
- }
-
- LOG.info("No live service instances were found");
- return null;
- }
-
- private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
- int taskNum,
- InetSocketAddress address,
- Token<JobTokenIdentifier> token) throws
- IOException {
- TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
- ApplicationId appId = submitWorkInfo.getFakeAppId();
-
- SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
- // This works, assuming the executor is running within YARN.
- LOG.info("Setting user in submitWorkRequest to: " +
- System.getenv(ApplicationConstants.Environment.USER.name()));
- builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
- builder.setApplicationIdString(appId.toString());
- builder.setAppAttemptNumber(0);
- builder.setTokenIdentifier(appId.toString());
-
- ContainerId containerId =
- ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
- builder.setContainerIdString(containerId.toString());
-
- builder.setAmHost(address.getHostName());
- builder.setAmPort(address.getPort());
- Credentials taskCredentials = new Credentials();
- // Credentials can change across DAGs. Ideally construct only once per DAG.
- // TODO Figure out where credentials will come from. Normally Hive sets up
- // URLs on the tez dag, for which Tez acquires credentials.
-
- // taskCredentials.addAll(getContext().getCredentials());
-
- // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
- // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
- // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
- // if (credentialsBinary == null) {
- // credentialsBinary = serializeCredentials(getContext().getCredentials());
- // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
- // } else {
- // credentialsBinary = credentialsBinary.duplicate();
- // }
- // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
- Credentials credentials = new Credentials();
- TokenCache.setSessionToken(token, credentials);
- ByteBuffer credentialsBinary = serializeCredentials(credentials);
- builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
-
-
- builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
-
- FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder();
- runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
- runtimeInfo.setWithinDagPriority(0);
- runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
- runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
- runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
- runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
-
-
- builder.setUsingTezAm(false);
- builder.setFragmentRuntimeInfo(runtimeInfo.build());
- return builder.build();
- }
-
- private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
- Credentials containerCredentials = new Credentials();
- containerCredentials.addAll(credentials);
- DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
- containerCredentials.writeTokenStorageToStream(containerTokens_dob);
- return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
- }
-
- private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
- protected LlapBaseRecordReader recordReader = null;
- protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
-
- public LlapRecordReaderTaskUmbilicalExternalResponder() {
- }
-
- @Override
- public void submissionFailed(String fragmentId, Throwable throwable) {
- try {
- sendOrQueueEvent(ReaderEvent.errorEvent(
- "Received submission failed event for fragment ID " + fragmentId));
- } catch (Exception err) {
- LOG.error("Error during heartbeat responder:", err);
- }
- }
-
- @Override
- public void heartbeat(TezHeartbeatRequest request) {
- TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
- List<TezEvent> inEvents = request.getEvents();
- for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
- EventType eventType = tezEvent.getEventType();
- try {
- switch (eventType) {
- case TASK_ATTEMPT_COMPLETED_EVENT:
- sendOrQueueEvent(ReaderEvent.doneEvent());
- break;
- case TASK_ATTEMPT_FAILED_EVENT:
- TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
- sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
- break;
- case TASK_STATUS_UPDATE_EVENT:
- // If we want to handle counters
- break;
- default:
- LOG.warn("Unhandled event type " + eventType);
- break;
- }
- } catch (Exception err) {
- LOG.error("Error during heartbeat responder:", err);
- }
- }
- }
-
- @Override
- public void taskKilled(TezTaskAttemptID taskAttemptId) {
- try {
- sendOrQueueEvent(ReaderEvent.errorEvent(
- "Received task killed event for task ID " + taskAttemptId));
- } catch (Exception err) {
- LOG.error("Error during heartbeat responder:", err);
- }
- }
-
- @Override
- public void heartbeatTimeout(String taskAttemptId) {
- try {
- sendOrQueueEvent(ReaderEvent.errorEvent(
- "Timed out waiting for heartbeat for task ID " + taskAttemptId));
- } catch (Exception err) {
- LOG.error("Error during heartbeat responder:", err);
- }
- }
-
- public synchronized LlapBaseRecordReader getRecordReader() {
- return recordReader;
- }
-
- public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
- this.recordReader = recordReader;
-
- if (recordReader == null) {
- return;
- }
-
- // If any events were queued by the responder, give them to the record reader now.
- while (!queuedEvents.isEmpty()) {
- ReaderEvent readerEvent = queuedEvents.poll();
- LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
- recordReader.handleEvent(readerEvent);
- }
- }
-
- /**
- * Send the ReaderEvents to the record reader, if it is registered to this responder.
- * If there is no registered record reader, add them to a list of pending reader events
- * since we don't want to drop these events.
- * @param readerEvent
- */
- protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
- LlapBaseRecordReader recordReader = getRecordReader();
- if (recordReader != null) {
- recordReader.handleEvent(readerEvent);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType()
- + " with message " + readerEvent.getMessage());
- }
-
- try {
- queuedEvents.put(readerEvent);
- } catch (Exception err) {
- throw new RuntimeException("Unexpected exception while queueing reader event", err);
- }
- }
- }
-
- /**
- * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader.
- */
- public void clearQueuedEvents() {
- queuedEvents.clear();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/llap-ext-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-ext-client/pom.xml b/llap-ext-client/pom.xml
index 5a7e385..fdf16cd 100644
--- a/llap-ext-client/pom.xml
+++ b/llap-ext-client/pom.xml
@@ -74,6 +74,39 @@
<version>${hadoop.version}</version>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-api</artifactId>
+ <version>${tez.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <version>${tez.version}</version>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commmons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- test inter-project -->
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 61eb2ea..10d14c0 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -18,6 +18,8 @@ package org.apache.hadoop.hive.llap;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
import java.sql.SQLException;
import java.sql.Connection;
@@ -30,8 +32,28 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataInputStream;
import java.io.ByteArrayInputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
+import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
+import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient.LlapTaskUmbilicalExternalResponder;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.NullWritable;
@@ -44,16 +66,39 @@ import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
-import org.apache.hive.llap.ext.LlapInputSplit;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
+import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
public class LlapBaseInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
+ private static final Logger LOG = LoggerFactory.getLogger(LlapBaseInputFormat.class);
+
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
private String url; // "jdbc:hive2://localhost:10000/default"
private String user; // "hive",
@@ -82,8 +127,58 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
@Override
public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
+
LlapInputSplit llapSplit = (LlapInputSplit) split;
- return llapSplit.getInputFormat().getRecordReader(llapSplit.getSplit(), job, reporter);
+
+ // Set conf to use LLAP user rather than current user for LLAP Zk registry.
+ HiveConf.setVar(job, HiveConf.ConfVars.LLAP_ZK_REGISTRY_USER, llapSplit.getLlapUser());
+ SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
+
+ ServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
+ String host = serviceInstance.getHost();
+ int llapSubmitPort = serviceInstance.getRpcPort();
+
+ LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
+ + " and outputformat port " + serviceInstance.getOutputFormatPort());
+
+ LlapRecordReaderTaskUmbilicalExternalResponder umbilicalResponder =
+ new LlapRecordReaderTaskUmbilicalExternalResponder();
+ LlapTaskUmbilicalExternalClient llapClient =
+ new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
+ submitWorkInfo.getToken(), umbilicalResponder);
+ llapClient.init(job);
+ llapClient.start();
+
+ SubmitWorkRequestProto submitWorkRequestProto =
+ constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
+ llapClient.getAddress(), submitWorkInfo.getToken());
+
+ TezEvent tezEvent = new TezEvent();
+ DataInputBuffer dib = new DataInputBuffer();
+ dib.reset(llapSplit.getFragmentBytes(), 0, llapSplit.getFragmentBytes().length);
+ tezEvent.readFields(dib);
+ List<TezEvent> tezEventList = Lists.newArrayList();
+ tezEventList.add(tezEvent);
+
+ llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
+
+ String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
+
+ HiveConf conf = new HiveConf();
+ Socket socket = new Socket(host,
+ serviceInstance.getOutputFormatPort());
+
+ LOG.debug("Socket connected");
+
+ socket.getOutputStream().write(id.getBytes());
+ socket.getOutputStream().write(0);
+ socket.getOutputStream().flush();
+
+ LOG.info("Registered id: " + id);
+
+ LlapBaseRecordReader recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
+ umbilicalResponder.setRecordReader(recordReader);
+ return recordReader;
}
@Override
@@ -112,10 +207,10 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
ResultSet res = stmt.executeQuery(sql);
while (res.next()) {
// deserialize split
- DataInput in = new DataInputStream(res.getBinaryStream(3));
- InputSplitWithLocationInfo is = (InputSplitWithLocationInfo)Class.forName(res.getString(2)).newInstance();
+ DataInput in = new DataInputStream(res.getBinaryStream(1));
+ InputSplitWithLocationInfo is = new LlapInputSplit();
is.readFields(in);
- ins.add(new LlapInputSplit(is, res.getString(1)));
+ ins.add(is);
}
res.close();
@@ -133,4 +228,250 @@ public class LlapBaseInputFormat<V extends WritableComparable> implements InputF
// ignore
}
}
+
+ private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException {
+ LlapRegistryService registryService = LlapRegistryService.getClient(job);
+ String host = llapSplit.getLocations()[0];
+
+ ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host);
+ if (serviceInstance == null) {
+ throw new IOException("No service instances found for " + host + " in registry");
+ }
+
+ return serviceInstance;
+ }
+
+ private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
+ InetAddress address = InetAddress.getByName(host);
+ ServiceInstanceSet instanceSet = registryService.getInstances();
+ ServiceInstance serviceInstance = null;
+
+ // The name used in the service registry may not match the host name we're using.
+ // Try hostname/canonical hostname/host address
+
+ String name = address.getHostName();
+ LOG.info("Searching service instance by hostname " + name);
+ serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+ if (serviceInstance != null) {
+ return serviceInstance;
+ }
+
+ name = address.getCanonicalHostName();
+ LOG.info("Searching service instance by canonical hostname " + name);
+ serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+ if (serviceInstance != null) {
+ return serviceInstance;
+ }
+
+ name = address.getHostAddress();
+ LOG.info("Searching service instance by address " + name);
+ serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+ if (serviceInstance != null) {
+ return serviceInstance;
+ }
+
+ return serviceInstance;
+ }
+
+ private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) {
+ if (serviceInstances == null || serviceInstances.isEmpty()) {
+ return null;
+ }
+
+ // Get the first live service instance
+ for (ServiceInstance serviceInstance : serviceInstances) {
+ if (serviceInstance.isAlive()) {
+ return serviceInstance;
+ }
+ }
+
+ LOG.info("No live service instances were found");
+ return null;
+ }
+
+ private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
+ int taskNum,
+ InetSocketAddress address,
+ Token<JobTokenIdentifier> token) throws
+ IOException {
+ TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
+ ApplicationId appId = submitWorkInfo.getFakeAppId();
+
+ SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
+ // This works, assuming the executor is running within YARN.
+ LOG.info("Setting user in submitWorkRequest to: " +
+ System.getenv(ApplicationConstants.Environment.USER.name()));
+ builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
+ builder.setApplicationIdString(appId.toString());
+ builder.setAppAttemptNumber(0);
+ builder.setTokenIdentifier(appId.toString());
+
+ ContainerId containerId =
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
+ builder.setContainerIdString(containerId.toString());
+
+ builder.setAmHost(address.getHostName());
+ builder.setAmPort(address.getPort());
+ Credentials taskCredentials = new Credentials();
+ // Credentials can change across DAGs. Ideally construct only once per DAG.
+ // TODO Figure out where credentials will come from. Normally Hive sets up
+ // URLs on the tez dag, for which Tez acquires credentials.
+
+ // taskCredentials.addAll(getContext().getCredentials());
+
+ // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+ // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+ // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
+ // if (credentialsBinary == null) {
+ // credentialsBinary = serializeCredentials(getContext().getCredentials());
+ // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
+ // } else {
+ // credentialsBinary = credentialsBinary.duplicate();
+ // }
+ // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+ Credentials credentials = new Credentials();
+ TokenCache.setSessionToken(token, credentials);
+ ByteBuffer credentialsBinary = serializeCredentials(credentials);
+ builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+
+
+ builder.setFragmentSpec(Converters.convertTaskSpecToProto(taskSpec));
+
+ FragmentRuntimeInfo.Builder runtimeInfo = FragmentRuntimeInfo.newBuilder();
+ runtimeInfo.setCurrentAttemptStartTime(System.currentTimeMillis());
+ runtimeInfo.setWithinDagPriority(0);
+ runtimeInfo.setDagStartTime(submitWorkInfo.getCreationTime());
+ runtimeInfo.setFirstAttemptStartTime(submitWorkInfo.getCreationTime());
+ runtimeInfo.setNumSelfAndUpstreamTasks(taskSpec.getVertexParallelism());
+ runtimeInfo.setNumSelfAndUpstreamCompletedTasks(0);
+
+
+ builder.setUsingTezAm(false);
+ builder.setFragmentRuntimeInfo(runtimeInfo.build());
+ return builder.build();
+ }
+
+ private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
+ Credentials containerCredentials = new Credentials();
+ containerCredentials.addAll(credentials);
+ DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+ containerCredentials.writeTokenStorageToStream(containerTokens_dob);
+ return ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
+ }
+
+ private static class LlapRecordReaderTaskUmbilicalExternalResponder implements LlapTaskUmbilicalExternalResponder {
+ protected LlapBaseRecordReader recordReader = null;
+ protected LinkedBlockingQueue<ReaderEvent> queuedEvents = new LinkedBlockingQueue<ReaderEvent>();
+
+ public LlapRecordReaderTaskUmbilicalExternalResponder() {
+ }
+
+ @Override
+ public void submissionFailed(String fragmentId, Throwable throwable) {
+ try {
+ sendOrQueueEvent(ReaderEvent.errorEvent(
+ "Received submission failed event for fragment ID " + fragmentId));
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+
+ @Override
+ public void heartbeat(TezHeartbeatRequest request) {
+ TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
+ List<TezEvent> inEvents = request.getEvents();
+ for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
+ EventType eventType = tezEvent.getEventType();
+ try {
+ switch (eventType) {
+ case TASK_ATTEMPT_COMPLETED_EVENT:
+ sendOrQueueEvent(ReaderEvent.doneEvent());
+ break;
+ case TASK_ATTEMPT_FAILED_EVENT:
+ TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) tezEvent.getEvent();
+ sendOrQueueEvent(ReaderEvent.errorEvent(taskFailedEvent.getDiagnostics()));
+ break;
+ case TASK_STATUS_UPDATE_EVENT:
+ // If we want to handle counters
+ break;
+ default:
+ LOG.warn("Unhandled event type " + eventType);
+ break;
+ }
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+ }
+
+ @Override
+ public void taskKilled(TezTaskAttemptID taskAttemptId) {
+ try {
+ sendOrQueueEvent(ReaderEvent.errorEvent(
+ "Received task killed event for task ID " + taskAttemptId));
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+
+ @Override
+ public void heartbeatTimeout(String taskAttemptId) {
+ try {
+ sendOrQueueEvent(ReaderEvent.errorEvent(
+ "Timed out waiting for heartbeat for task ID " + taskAttemptId));
+ } catch (Exception err) {
+ LOG.error("Error during heartbeat responder:", err);
+ }
+ }
+
+ public synchronized LlapBaseRecordReader getRecordReader() {
+ return recordReader;
+ }
+
+ public synchronized void setRecordReader(LlapBaseRecordReader recordReader) {
+ this.recordReader = recordReader;
+
+ if (recordReader == null) {
+ return;
+ }
+
+ // If any events were queued by the responder, give them to the record reader now.
+ while (!queuedEvents.isEmpty()) {
+ ReaderEvent readerEvent = queuedEvents.poll();
+ LOG.debug("Sending queued event to record reader: " + readerEvent.getEventType());
+ recordReader.handleEvent(readerEvent);
+ }
+ }
+
+ /**
+ * Send the ReaderEvents to the record reader, if it is registered to this responder.
+ * If there is no registered record reader, add them to a list of pending reader events
+ * since we don't want to drop these events.
+ * @param readerEvent
+ */
+ protected synchronized void sendOrQueueEvent(ReaderEvent readerEvent) {
+ LlapBaseRecordReader recordReader = getRecordReader();
+ if (recordReader != null) {
+ recordReader.handleEvent(readerEvent);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No registered record reader, queueing event " + readerEvent.getEventType()
+ + " with message " + readerEvent.getMessage());
+ }
+
+ try {
+ queuedEvents.put(readerEvent);
+ } catch (Exception err) {
+ throw new RuntimeException("Unexpected exception while queueing reader event", err);
+ }
+ }
+ }
+
+ /**
+ * Clear the list of queued reader events if we are not interested in sending any pending events to any registering record reader.
+ */
+ public void clearQueuedEvents() {
+ queuedEvents.clear();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
index 6ecb0f9..56ad555 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapRowInputFormat.java
@@ -3,6 +3,7 @@ package org.apache.hadoop.hive.llap;
import java.io.IOException;
import org.apache.hadoop.hive.llap.LlapBaseRecordReader;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
import org.apache.hadoop.hive.llap.LlapRowRecordReader;
import org.apache.hadoop.hive.llap.Row;
import org.apache.hadoop.hive.llap.Schema;
@@ -15,7 +16,6 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hive.llap.ext.LlapInputSplit;
public class LlapRowInputFormat implements InputFormat<NullWritable, Row> {
@@ -29,7 +29,7 @@ public class LlapRowInputFormat implements InputFormat<NullWritable, Row> {
@Override
public RecordReader<NullWritable, Row> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
throws IOException {
- LlapInputSplit<Text> llapSplit = (LlapInputSplit<Text>) split;
+ LlapInputSplit llapSplit = (LlapInputSplit) split;
LlapBaseRecordReader<Text> reader = (LlapBaseRecordReader<Text>) baseInputFormat.getRecordReader(llapSplit, job, reporter);
return new LlapRowRecordReader(job, reader.getSchema(), reader);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java b/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java
deleted file mode 100644
index d8881c4..0000000
--- a/llap-ext-client/src/java/org/apache/hive/llap/ext/LlapInputSplit.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package org.apache.hive.llap.ext;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-
-
-public class LlapInputSplit<V extends WritableComparable> implements InputSplitWithLocationInfo {
- InputSplitWithLocationInfo nativeSplit;
- String inputFormatClassName;
-
- public LlapInputSplit() {
- }
-
- public LlapInputSplit(InputSplitWithLocationInfo nativeSplit, String inputFormatClassName) {
- this.nativeSplit = nativeSplit;
- this.inputFormatClassName = inputFormatClassName;
- }
-
- @Override
- public long getLength() throws IOException {
- return nativeSplit.getLength();
- }
-
- @Override
- public String[] getLocations() throws IOException {
- return nativeSplit.getLocations();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(inputFormatClassName);
- out.writeUTF(nativeSplit.getClass().getName());
- nativeSplit.write(out);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- inputFormatClassName = in.readUTF();
- String splitClass = in.readUTF();
- try {
- nativeSplit = (InputSplitWithLocationInfo)Class.forName(splitClass).newInstance();
- } catch (Exception e) {
- throw new IOException(e);
- }
- nativeSplit.readFields(in);
- }
-
- @Override
- public SplitLocationInfo[] getLocationInfo() throws IOException {
- return nativeSplit.getLocationInfo();
- }
-
- public InputSplit getSplit() {
- return nativeSplit;
- }
-
- public InputFormat<NullWritable, V> getInputFormat() throws IOException {
- try {
- return (InputFormat<NullWritable, V>) Class.forName(inputFormatClassName)
- .newInstance();
- } catch(Exception e) {
- throw new IOException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 54bd830..6b25ce1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -447,7 +447,6 @@ public final class FunctionRegistry {
system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class);
system.registerGenericUDTF("stack", GenericUDTFStack.class);
system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class);
- system.registerGenericUDTF("execute_splits", GenericUDTFExecuteSplits.class);
//PTF declarations
system.registerGenericUDF(LEAD_FUNC_NAME, GenericUDFLead.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java
deleted file mode 100644
index 12759ab..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.udf.generic;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.exec.Description;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.udf.UDFType;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFGetSplits.PlanFragment;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.RecordReader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * GenericUDTFExecuteSplits.
- *
- */
-@Description(name = "execute_splits", value = "_FUNC_(string,int) - "
- + "Returns an array of length int serialized splits for the referenced tables string.")
-@UDFType(deterministic = false)
-public class GenericUDTFExecuteSplits extends GenericUDTFGetSplits {
-
- private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFExecuteSplits.class);
-
- @Override
- public StructObjectInspector initialize(ObjectInspector[] arguments)
- throws UDFArgumentException {
-
- LOG.debug("initializing ExecuteSplits");
-
- if (SessionState.get() == null || SessionState.get().getConf() == null) {
- throw new IllegalStateException("Cannot run execute splits outside HS2");
- }
-
- if (arguments.length != 2) {
- throw new UDFArgumentLengthException("The function execute_splits accepts 2 arguments.");
- } else if (!(arguments[0] instanceof StringObjectInspector)) {
- LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
- throw new UDFArgumentTypeException(0, "\""
- + "string\" is expected at function execute_splits, " + "but \""
- + arguments[0].getTypeName() + "\" is found");
- } else if (!(arguments[1] instanceof IntObjectInspector)) {
- LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
- throw new UDFArgumentTypeException(1, "\""
- + "int\" is expected at function execute_splits, " + "but \""
- + arguments[1].getTypeName() + "\" is found");
- }
-
- stringOI = (StringObjectInspector) arguments[0];
- intOI = (IntObjectInspector) arguments[1];
-
- List<String> names = Arrays.asList("split_num","value");
- List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
- PrimitiveObjectInspectorFactory.javaIntObjectInspector,
- PrimitiveObjectInspectorFactory.javaStringObjectInspector);
- StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
-
- LOG.debug("done initializing GenericUDTFExecuteSplits");
- return outputOI;
- }
-
- @Override
- public void process(Object[] arguments) throws HiveException {
-
- String query = stringOI.getPrimitiveJavaObject(arguments[0]);
- int num = intOI.get(arguments[1]);
-
- PlanFragment fragment = createPlanFragment(query, num);
- try {
- InputFormat<NullWritable, Text> format = (InputFormat<NullWritable,Text>)(Class.forName("org.apache.hadoop.hive.llap.LlapInputFormat").newInstance());
- int index = 0;
- for (InputSplit s: getSplits(jc, num, fragment.work, fragment.schema)) {
- RecordReader<NullWritable, Text> reader = format.getRecordReader(s,fragment.jc,null);
- Text value = reader.createValue();
- NullWritable key = reader.createKey();
- index++;
- while(reader.next(key,value)) {
- Object[] os = new Object[2];
- os[0] = index;
- os[1] = value.toString();
- forward(os);
- }
- }
- } catch(Exception e) {
- throw new HiveException(e);
- }
- }
-
- @Override
- public void close() throws HiveException {
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index 51027a7..9a52c7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -150,10 +150,8 @@ public class GenericUDTFGetSplits extends GenericUDTF {
stringOI = (StringObjectInspector) arguments[0];
intOI = (IntObjectInspector) arguments[1];
- List<String> names = Arrays.asList("if_class","split_class","split");
+ List<String> names = Arrays.asList("split");
List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
- PrimitiveObjectInspectorFactory.javaStringObjectInspector,
- PrimitiveObjectInspectorFactory.javaStringObjectInspector,
PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
@@ -185,13 +183,11 @@ public class GenericUDTFGetSplits extends GenericUDTF {
try {
for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
- Object[] os = new Object[3];
- os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME;
- os[1] = s.getClass().getName();
+ Object[] os = new Object[1];
bos.reset();
s.write(dos);
byte[] frozen = bos.toByteArray();
- os[2] = frozen;
+ os[0] = frozen;
forward(os);
}
} catch(Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/test/queries/clientpositive/udtf_get_splits.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/udtf_get_splits.q b/ql/src/test/queries/clientpositive/udtf_get_splits.q
deleted file mode 100644
index f378dca..0000000
--- a/ql/src/test/queries/clientpositive/udtf_get_splits.q
+++ /dev/null
@@ -1,43 +0,0 @@
-set hive.fetch.task.conversion=more;
-set hive.mapred.mode=nonstrict;
-set mapred.max.split.size=100;
-set mapred.min.split.size.per.node=100;
-set mapred.min.split.size.per.rack=100;
-set mapred.max.split.size=100;
-set tez.grouping.max-size=100;
-set tez.grouping.min-size=100;
-
-DESCRIBE FUNCTION get_splits;
-DESCRIBE FUNCTION execute_splits;
-
-select r1, r2, floor(length(r3)/100000)
-from
- (select
- get_splits(
- "select key, count(*) from srcpart where key % 2 = 0 group by key",
- 5) as (r1, r2, r3)) t;
-
-select r1, r2, floor(length(r3)/100000)
-from
- (select
- get_splits(
- "select key from srcpart where key % 2 = 0",
- 5) as (r1, r2, r3)) t;
-
-show tables;
-
-select r1, r2
-from
- (select
- execute_splits(
- "select key from srcpart where key % 2 = 0",
- 1) as (r1, r2)) t;
-
-select r1, r2
-from
- (select
- execute_splits(
- "select key from srcpart where key % 2 = 0",
- 5) as (r1, r2)) t;
-
-select count(*) from (select key from srcpart where key % 2 = 0) t;
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out b/ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out
deleted file mode 100644
index 2f17a91..0000000
--- a/ql/src/test/results/clientpositive/llap/udtf_get_splits.q.out
+++ /dev/null
@@ -1,2130 +0,0 @@
-PREHOOK: query: DESCRIBE FUNCTION get_splits
-PREHOOK: type: DESCFUNCTION
-POSTHOOK: query: DESCRIBE FUNCTION get_splits
-POSTHOOK: type: DESCFUNCTION
-get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string.
-PREHOOK: query: DESCRIBE FUNCTION execute_splits
-PREHOOK: type: DESCFUNCTION
-POSTHOOK: query: DESCRIBE FUNCTION execute_splits
-POSTHOOK: type: DESCFUNCTION
-execute_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string.
-PREHOOK: query: select r1, r2, floor(length(r3)/100000)
-from
- (select
- get_splits(
- "select key, count(*) from srcpart where key % 2 = 0 group by key",
- 5) as (r1, r2, r3)) t
-PREHOOK: type: QUERY
-PREHOOK: Input: _dummy_database@_dummy_table
-#### A masked pattern was here ####
-POSTHOOK: query: select r1, r2, floor(length(r3)/100000)
-from
- (select
- get_splits(
- "select key, count(*) from srcpart where key % 2 = 0 group by key",
- 5) as (r1, r2, r3)) t
-POSTHOOK: type: QUERY
-POSTHOOK: Input: _dummy_database@_dummy_table
-#### A masked pattern was here ####
-PREHOOK: query: select r1, r2, floor(length(r3)/100000)
-from
- (select
- get_splits(
- "select key, count(*) from srcpart where key % 2 = 0 group by key",
- 5) as (r1, r2, r3)) t
-PREHOOK: type: QUERY
-PREHOOK: Input: default@srcpart
-PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
-PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
-PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
-PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
-PREHOOK: Output: database:default
-PREHOOK: Output: default@#### A masked pattern was here ####
-POSTHOOK: query: select r1, r2, floor(length(r3)/100000)
-from
- (select
- get_splits(
- "select key, count(*) from srcpart where key % 2 = 0 group by key",
- 5) as (r1, r2, r3)) t
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@srcpart
-POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
-POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
-POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
-POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@#### A masked pattern was here ####
-org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1
-PREHOOK: query: select r1, r2, floor(length(r3)/100000)
-from
- (select
- get_splits(
- "select key from srcpart where key % 2 = 0",
- 5) as (r1, r2, r3)) t
-PREHOOK: type: QUERY
-PREHOOK: Input: _dummy_database@_dummy_table
-#### A masked pattern was here ####
-POSTHOOK: query: select r1, r2, floor(length(r3)/100000)
-from
- (select
- get_splits(
- "select key from srcpart where key % 2 = 0",
- 5) as (r1, r2, r3)) t
-POSTHOOK: type: QUERY
-POSTHOOK: Input: _dummy_database@_dummy_table
-#### A masked pattern was here ####
-org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1
-org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1
-org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1
-org.apache.hadoop.hive.llap.LlapInputFormat org.apache.hadoop.hive.llap.LlapInputSplit 1
-PREHOOK: query: show tables
-PREHOOK: type: SHOWTABLES
-PREHOOK: Input: database:default
-POSTHOOK: query: show tables
-POSTHOOK: type: SHOWTABLES
-POSTHOOK: Input: database:default
-alltypesorc
-cbo_t1
-cbo_t2
-cbo_t3
-lineitem
-part
-src
-src1
-src_cbo
-src_json
-src_sequencefile
-src_thrift
-srcbucket
-srcbucket2
-srcpart
-#### A masked pattern was here ####
-PREHOOK: query: select r1, r2
-from
- (select
- execute_splits(
- "select key from srcpart where key % 2 = 0",
- 1) as (r1, r2)) t
-PREHOOK: type: QUERY
-PREHOOK: Input: _dummy_database@_dummy_table
-#### A masked pattern was here ####
-POSTHOOK: query: select r1, r2
-from
- (select
- execute_splits(
- "select key from srcpart where key % 2 = 0",
- 1) as (r1, r2)) t
-POSTHOOK: type: QUERY
-POSTHOOK: Input: _dummy_database@_dummy_table
-#### A masked pattern was here ####
-1 238
-1 86
-1 278
-1 98
-1 484
-1 150
-1 224
-1 66
-1 128
-1 146
-1 406
-1 374
-1 152
-1 82
-1 166
-1 430
-1 252
-1 292
-1 338
-1 446
-1 394
-1 482
-1 174
-1 494
-1 466
-1 208
-1 174
-1 396
-1 162
-1 266
-1 342
-1 0
-1 128
-1 316
-1 302
-1 438
-1 170
-1 20
-1 378
-1 92
-1 72
-1 4
-1 280
-1 208
-1 356
-1 382
-1 498
-1 386
-1 192
-1 286
-1 176
-1 54
-1 138
-1 216
-1 430
-1 278
-1 176
-1 318
-1 332
-1 180
-1 284
-1 12
-1 230
-1 260
-1 404
-1 384
-1 272
-1 138
-1 84
-1 348
-1 466
-1 58
-1 8
-1 230
-1 208
-1 348
-1 24
-1 172
-1 42
-1 158
-1 496
-1 0
-1 322
-1 468
-1 454
-1 100
-1 298
-1 418
-1 96
-1 26
-1 230
-1 120
-1 404
-1 436
-1 156
-1 468
-1 308
-1 196
-1 288
-1 98
-1 282
-1 318
-1 318
-1 470
-1 316
-1 0
-1 490
-1 364
-1 118
-1 134
-1 282
-1 138
-1 238
-1 118
-1 72
-1 90
-1 10
-1 306
-1 224
-1 242
-1 392
-1 272
-1 242
-1 452
-1 226
-1 402
-1 396
-1 58
-1 336
-1 168
-1 34
-1 472
-1 322
-1 498
-1 160
-1 42
-1 430
-1 458
-1 78
-1 76
-1 492
-1 218
-1 228
-1 138
-1 30
-1 64
-1 468
-1 76
-1 74
-1 342
-1 230
-1 368
-1 296
-1 216
-1 344
-1 274
-1 116
-1 256
-1 70
-1 480
-1 288
-1 244
-1 438
-1 128
-1 432
-1 202
-1 316
-1 280
-1 2
-1 80
-1 44
-1 104
-1 466
-1 366
-1 406
-1 190
-1 406
-1 114
-1 258
-1 90
-1 262
-1 348
-1 424
-1 12
-1 396
-1 164
-1 454
-1 478
-1 298
-1 164
-1 424
-1 382
-1 70
-1 480
-1 24
-1 104
-1 70
-1 438
-1 414
-1 200
-1 360
-1 248
-1 444
-1 120
-1 230
-1 478
-1 178
-1 468
-1 310
-1 460
-1 480
-1 136
-1 172
-1 214
-1 462
-1 406
-1 454
-1 384
-1 256
-1 26
-1 134
-1 384
-1 18
-1 462
-1 492
-1 100
-1 298
-1 498
-1 146
-1 458
-1 362
-1 186
-1 348
-1 18
-1 344
-1 84
-1 28
-1 448
-1 152
-1 348
-1 194
-1 414
-1 222
-1 126
-1 90
-1 400
-1 200
-2 238
-2 86
-2 278
-2 98
-2 484
-2 150
-2 224
-2 66
-2 128
-2 146
-2 406
-2 374
-2 152
-2 82
-2 166
-2 430
-2 252
-2 292
-2 338
-2 446
-2 394
-2 482
-2 174
-2 494
-2 466
-2 208
-2 174
-2 396
-2 162
-2 266
-2 342
-2 0
-2 128
-2 316
-2 302
-2 438
-2 170
-2 20
-2 378
-2 92
-2 72
-2 4
-2 280
-2 208
-2 356
-2 382
-2 498
-2 386
-2 192
-2 286
-2 176
-2 54
-2 138
-2 216
-2 430
-2 278
-2 176
-2 318
-2 332
-2 180
-2 284
-2 12
-2 230
-2 260
-2 404
-2 384
-2 272
-2 138
-2 84
-2 348
-2 466
-2 58
-2 8
-2 230
-2 208
-2 348
-2 24
-2 172
-2 42
-2 158
-2 496
-2 0
-2 322
-2 468
-2 454
-2 100
-2 298
-2 418
-2 96
-2 26
-2 230
-2 120
-2 404
-2 436
-2 156
-2 468
-2 308
-2 196
-2 288
-2 98
-2 282
-2 318
-2 318
-2 470
-2 316
-2 0
-2 490
-2 364
-2 118
-2 134
-2 282
-2 138
-2 238
-2 118
-2 72
-2 90
-2 10
-2 306
-2 224
-2 242
-2 392
-2 272
-2 242
-2 452
-2 226
-2 402
-2 396
-2 58
-2 336
-2 168
-2 34
-2 472
-2 322
-2 498
-2 160
-2 42
-2 430
-2 458
-2 78
-2 76
-2 492
-2 218
-2 228
-2 138
-2 30
-2 64
-2 468
-2 76
-2 74
-2 342
-2 230
-2 368
-2 296
-2 216
-2 344
-2 274
-2 116
-2 256
-2 70
-2 480
-2 288
-2 244
-2 438
-2 128
-2 432
-2 202
-2 316
-2 280
-2 2
-2 80
-2 44
-2 104
-2 466
-2 366
-2 406
-2 190
-2 406
-2 114
-2 258
-2 90
-2 262
-2 348
-2 424
-2 12
-2 396
-2 164
-2 454
-2 478
-2 298
-2 164
-2 424
-2 382
-2 70
-2 480
-2 24
-2 104
-2 70
-2 438
-2 414
-2 200
-2 360
-2 248
-2 444
-2 120
-2 230
-2 478
-2 178
-2 468
-2 310
-2 460
-2 480
-2 136
-2 172
-2 214
-2 462
-2 406
-2 454
-2 384
-2 256
-2 26
-2 134
-2 384
-2 18
-2 462
-2 492
-2 100
-2 298
-2 498
-2 146
-2 458
-2 362
-2 186
-2 348
-2 18
-2 344
-2 84
-2 28
-2 448
-2 152
-2 348
-2 194
-2 414
-2 222
-2 126
-2 90
-2 400
-2 200
-3 238
-3 86
-3 278
-3 98
-3 484
-3 150
-3 224
-3 66
-3 128
-3 146
-3 406
-3 374
-3 152
-3 82
-3 166
-3 430
-3 252
-3 292
-3 338
-3 446
-3 394
-3 482
-3 174
-3 494
-3 466
-3 208
-3 174
-3 396
-3 162
-3 266
-3 342
-3 0
-3 128
-3 316
-3 302
-3 438
-3 170
-3 20
-3 378
-3 92
-3 72
-3 4
-3 280
-3 208
-3 356
-3 382
-3 498
-3 386
-3 192
-3 286
-3 176
-3 54
-3 138
-3 216
-3 430
-3 278
-3 176
-3 318
-3 332
-3 180
-3 284
-3 12
-3 230
-3 260
-3 404
-3 384
-3 272
-3 138
-3 84
-3 348
-3 466
-3 58
-3 8
-3 230
-3 208
-3 348
-3 24
-3 172
-3 42
-3 158
-3 496
-3 0
-3 322
-3 468
-3 454
-3 100
-3 298
-3 418
-3 96
-3 26
-3 230
-3 120
-3 404
-3 436
-3 156
-3 468
-3 308
-3 196
-3 288
-3 98
-3 282
-3 318
-3 318
-3 470
-3 316
-3 0
-3 490
-3 364
-3 118
-3 134
-3 282
-3 138
-3 238
-3 118
-3 72
-3 90
-3 10
-3 306
-3 224
-3 242
-3 392
-3 272
-3 242
-3 452
-3 226
-3 402
-3 396
-3 58
-3 336
-3 168
-3 34
-3 472
-3 322
-3 498
-3 160
-3 42
-3 430
-3 458
-3 78
-3 76
-3 492
-3 218
-3 228
-3 138
-3 30
-3 64
-3 468
-3 76
-3 74
-3 342
-3 230
-3 368
-3 296
-3 216
-3 344
-3 274
-3 116
-3 256
-3 70
-3 480
-3 288
-3 244
-3 438
-3 128
-3 432
-3 202
-3 316
-3 280
-3 2
-3 80
-3 44
-3 104
-3 466
-3 366
-3 406
-3 190
-3 406
-3 114
-3 258
-3 90
-3 262
-3 348
-3 424
-3 12
-3 396
-3 164
-3 454
-3 478
-3 298
-3 164
-3 424
-3 382
-3 70
-3 480
-3 24
-3 104
-3 70
-3 438
-3 414
-3 200
-3 360
-3 248
-3 444
-3 120
-3 230
-3 478
-3 178
-3 468
-3 310
-3 460
-3 480
-3 136
-3 172
-3 214
-3 462
-3 406
-3 454
-3 384
-3 256
-3 26
-3 134
-3 384
-3 18
-3 462
-3 492
-3 100
-3 298
-3 498
-3 146
-3 458
-3 362
-3 186
-3 348
-3 18
-3 344
-3 84
-3 28
-3 448
-3 152
-3 348
-3 194
-3 414
-3 222
-3 126
-3 90
-3 400
-3 200
-4 238
-4 86
-4 278
-4 98
-4 484
-4 150
-4 224
-4 66
-4 128
-4 146
-4 406
-4 374
-4 152
-4 82
-4 166
-4 430
-4 252
-4 292
-4 338
-4 446
-4 394
-4 482
-4 174
-4 494
-4 466
-4 208
-4 174
-4 396
-4 162
-4 266
-4 342
-4 0
-4 128
-4 316
-4 302
-4 438
-4 170
-4 20
-4 378
-4 92
-4 72
-4 4
-4 280
-4 208
-4 356
-4 382
-4 498
-4 386
-4 192
-4 286
-4 176
-4 54
-4 138
-4 216
-4 430
-4 278
-4 176
-4 318
-4 332
-4 180
-4 284
-4 12
-4 230
-4 260
-4 404
-4 384
-4 272
-4 138
-4 84
-4 348
-4 466
-4 58
-4 8
-4 230
-4 208
-4 348
-4 24
-4 172
-4 42
-4 158
-4 496
-4 0
-4 322
-4 468
-4 454
-4 100
-4 298
-4 418
-4 96
-4 26
-4 230
-4 120
-4 404
-4 436
-4 156
-4 468
-4 308
-4 196
-4 288
-4 98
-4 282
-4 318
-4 318
-4 470
-4 316
-4 0
-4 490
-4 364
-4 118
-4 134
-4 282
-4 138
-4 238
-4 118
-4 72
-4 90
-4 10
-4 306
-4 224
-4 242
-4 392
-4 272
-4 242
-4 452
-4 226
-4 402
-4 396
-4 58
-4 336
-4 168
-4 34
-4 472
-4 322
-4 498
-4 160
-4 42
-4 430
-4 458
-4 78
-4 76
-4 492
-4 218
-4 228
-4 138
-4 30
-4 64
-4 468
-4 76
-4 74
-4 342
-4 230
-4 368
-4 296
-4 216
-4 344
-4 274
-4 116
-4 256
-4 70
-4 480
-4 288
-4 244
-4 438
-4 128
-4 432
-4 202
-4 316
-4 280
-4 2
-4 80
-4 44
-4 104
-4 466
-4 366
-4 406
-4 190
-4 406
-4 114
-4 258
-4 90
-4 262
-4 348
-4 424
-4 12
-4 396
-4 164
-4 454
-4 478
-4 298
-4 164
-4 424
-4 382
-4 70
-4 480
-4 24
-4 104
-4 70
-4 438
-4 414
-4 200
-4 360
-4 248
-4 444
-4 120
-4 230
-4 478
-4 178
-4 468
-4 310
-4 460
-4 480
-4 136
-4 172
-4 214
-4 462
-4 406
-4 454
-4 384
-4 256
-4 26
-4 134
-4 384
-4 18
-4 462
-4 492
-4 100
-4 298
-4 498
-4 146
-4 458
-4 362
-4 186
-4 348
-4 18
-4 344
-4 84
-4 28
-4 448
-4 152
-4 348
-4 194
-4 414
-4 222
-4 126
-4 90
-4 400
-4 200
-PREHOOK: query: select r1, r2
-from
- (select
- execute_splits(
- "select key from srcpart where key % 2 = 0",
- 5) as (r1, r2)) t
-PREHOOK: type: QUERY
-PREHOOK: Input: _dummy_database@_dummy_table
-#### A masked pattern was here ####
-POSTHOOK: query: select r1, r2
-from
- (select
- execute_splits(
- "select key from srcpart where key % 2 = 0",
- 5) as (r1, r2)) t
-POSTHOOK: type: QUERY
-POSTHOOK: Input: _dummy_database@_dummy_table
-#### A masked pattern was here ####
-1 238
-1 86
-1 278
-1 98
-1 484
-1 150
-1 224
-1 66
-1 128
-1 146
-1 406
-1 374
-1 152
-1 82
-1 166
-1 430
-1 252
-1 292
-1 338
-1 446
-1 394
-1 482
-1 174
-1 494
-1 466
-1 208
-1 174
-1 396
-1 162
-1 266
-1 342
-1 0
-1 128
-1 316
-1 302
-1 438
-1 170
-1 20
-1 378
-1 92
-1 72
-1 4
-1 280
-1 208
-1 356
-1 382
-1 498
-1 386
-1 192
-1 286
-1 176
-1 54
-1 138
-1 216
-1 430
-1 278
-1 176
-1 318
-1 332
-1 180
-1 284
-1 12
-1 230
-1 260
-1 404
-1 384
-1 272
-1 138
-1 84
-1 348
-1 466
-1 58
-1 8
-1 230
-1 208
-1 348
-1 24
-1 172
-1 42
-1 158
-1 496
-1 0
-1 322
-1 468
-1 454
-1 100
-1 298
-1 418
-1 96
-1 26
-1 230
-1 120
-1 404
-1 436
-1 156
-1 468
-1 308
-1 196
-1 288
-1 98
-1 282
-1 318
-1 318
-1 470
-1 316
-1 0
-1 490
-1 364
-1 118
-1 134
-1 282
-1 138
-1 238
-1 118
-1 72
-1 90
-1 10
-1 306
-1 224
-1 242
-1 392
-1 272
-1 242
-1 452
-1 226
-1 402
-1 396
-1 58
-1 336
-1 168
-1 34
-1 472
-1 322
-1 498
-1 160
-1 42
-1 430
-1 458
-1 78
-1 76
-1 492
-1 218
-1 228
-1 138
-1 30
-1 64
-1 468
-1 76
-1 74
-1 342
-1 230
-1 368
-1 296
-1 216
-1 344
-1 274
-1 116
-1 256
-1 70
-1 480
-1 288
-1 244
-1 438
-1 128
-1 432
-1 202
-1 316
-1 280
-1 2
-1 80
-1 44
-1 104
-1 466
-1 366
-1 406
-1 190
-1 406
-1 114
-1 258
-1 90
-1 262
-1 348
-1 424
-1 12
-1 396
-1 164
-1 454
-1 478
-1 298
-1 164
-1 424
-1 382
-1 70
-1 480
-1 24
-1 104
-1 70
-1 438
-1 414
-1 200
-1 360
-1 248
-1 444
-1 120
-1 230
-1 478
-1 178
-1 468
-1 310
-1 460
-1 480
-1 136
-1 172
-1 214
-1 462
-1 406
-1 454
-1 384
-1 256
-1 26
-1 134
-1 384
-1 18
-1 462
-1 492
-1 100
-1 298
-1 498
-1 146
-1 458
-1 362
-1 186
-1 348
-1 18
-1 344
-1 84
-1 28
-1 448
-1 152
-1 348
-1 194
-1 414
-1 222
-1 126
-1 90
-1 400
-1 200
-2 238
-2 86
-2 278
-2 98
-2 484
-2 150
-2 224
-2 66
-2 128
-2 146
-2 406
-2 374
-2 152
-2 82
-2 166
-2 430
-2 252
-2 292
-2 338
-2 446
-2 394
-2 482
-2 174
-2 494
-2 466
-2 208
-2 174
-2 396
-2 162
-2 266
-2 342
-2 0
-2 128
-2 316
-2 302
-2 438
-2 170
-2 20
-2 378
-2 92
-2 72
-2 4
-2 280
-2 208
-2 356
-2 382
-2 498
-2 386
-2 192
-2 286
-2 176
-2 54
-2 138
-2 216
-2 430
-2 278
-2 176
-2 318
-2 332
-2 180
-2 284
-2 12
-2 230
-2 260
-2 404
-2 384
-2 272
-2 138
-2 84
-2 348
-2 466
-2 58
-2 8
-2 230
-2 208
-2 348
-2 24
-2 172
-2 42
-2 158
-2 496
-2 0
-2 322
-2 468
-2 454
-2 100
-2 298
-2 418
-2 96
-2 26
-2 230
-2 120
-2 404
-2 436
-2 156
-2 468
-2 308
-2 196
-2 288
-2 98
-2 282
-2 318
-2 318
-2 470
-2 316
-2 0
-2 490
-2 364
-2 118
-2 134
-2 282
-2 138
-2 238
-2 118
-2 72
-2 90
-2 10
-2 306
-2 224
-2 242
-2 392
-2 272
-2 242
-2 452
-2 226
-2 402
-2 396
-2 58
-2 336
-2 168
-2 34
-2 472
-2 322
-2 498
-2 160
-2 42
-2 430
-2 458
-2 78
-2 76
-2 492
-2 218
-2 228
-2 138
-2 30
-2 64
-2 468
-2 76
-2 74
-2 342
-2 230
-2 368
-2 296
-2 216
-2 344
-2 274
-2 116
-2 256
-2 70
-2 480
-2 288
-2 244
-2 438
-2 128
-2 432
-2 202
-2 316
-2 280
-2 2
-2 80
-2 44
-2 104
-2 466
-2 366
-2 406
-2 190
-2 406
-2 114
-2 258
-2 90
-2 262
-2 348
-2 424
-2 12
-2 396
-2 164
-2 454
-2 478
-2 298
-2 164
-2 424
-2 382
-2 70
-2 480
-2 24
-2 104
-2 70
-2 438
-2 414
-2 200
-2 360
-2 248
-2 444
-2 120
-2 230
-2 478
-2 178
-2 468
-2 310
-2 460
-2 480
-2 136
-2 172
-2 214
-2 462
-2 406
-2 454
-2 384
-2 256
-2 26
-2 134
-2 384
-2 18
-2 462
-2 492
-2 100
-2 298
-2 498
-2 146
-2 458
-2 362
-2 186
-2 348
-2 18
-2 344
-2 84
-2 28
-2 448
-2 152
-2 348
-2 194
-2 414
-2 222
-2 126
-2 90
-2 400
-2 200
-3 238
-3 86
-3 278
-3 98
-3 484
-3 150
-3 224
-3 66
-3 128
-3 146
-3 406
-3 374
-3 152
-3 82
-3 166
-3 430
-3 252
-3 292
-3 338
-3 446
-3 394
-3 482
-3 174
-3 494
-3 466
-3 208
-3 174
-3 396
-3 162
-3 266
-3 342
-3 0
-3 128
-3 316
-3 302
-3 438
-3 170
-3 20
-3 378
-3 92
-3 72
-3 4
-3 280
-3 208
-3 356
-3 382
-3 498
-3 386
-3 192
-3 286
-3 176
-3 54
-3 138
-3 216
-3 430
-3 278
-3 176
-3 318
-3 332
-3 180
-3 284
-3 12
-3 230
-3 260
-3 404
-3 384
-3 272
-3 138
-3 84
-3 348
-3 466
-3 58
-3 8
-3 230
-3 208
-3 348
-3 24
-3 172
-3 42
-3 158
-3 496
-3 0
-3 322
-3 468
-3 454
-3 100
-3 298
-3 418
-3 96
-3 26
-3 230
-3 120
-3 404
-3 436
-3 156
-3 468
-3 308
-3 196
-3 288
-3 98
-3 282
-3 318
-3 318
-3 470
-3 316
-3 0
-3 490
-3 364
-3 118
-3 134
-3 282
-3 138
-3 238
-3 118
-3 72
-3 90
-3 10
-3 306
-3 224
-3 242
-3 392
-3 272
-3 242
-3 452
-3 226
-3 402
-3 396
-3 58
-3 336
-3 168
-3 34
-3 472
-3 322
-3 498
-3 160
-3 42
-3 430
-3 458
-3 78
-3 76
-3 492
-3 218
-3 228
-3 138
-3 30
-3 64
-3 468
-3 76
-3 74
-3 342
-3 230
-3 368
-3 296
-3 216
-3 344
-3 274
-3 116
-3 256
-3 70
-3 480
-3 288
-3 244
-3 438
-3 128
-3 432
-3 202
-3 316
-3 280
-3 2
-3 80
-3 44
-3 104
-3 466
-3 366
-3 406
-3 190
-3 406
-3 114
-3 258
-3 90
-3 262
-3 348
-3 424
-3 12
-3 396
-3 164
-3 454
-3 478
-3 298
-3 164
-3 424
-3 382
-3 70
-3 480
-3 24
-3 104
-3 70
-3 438
-3 414
-3 200
-3 360
-3 248
-3 444
-3 120
-3 230
-3 478
-3 178
-3 468
-3 310
-3 460
-3 480
-3 136
-3 172
-3 214
-3 462
-3 406
-3 454
-3 384
-3 256
-3 26
-3 134
-3 384
-3 18
-3 462
-3 492
-3 100
-3 298
-3 498
-3 146
-3 458
-3 362
-3 186
-3 348
-3 18
-3 344
-3 84
-3 28
-3 448
-3 152
-3 348
-3 194
-3 414
-3 222
-3 126
-3 90
-3 400
-3 200
-4 238
-4 86
-4 278
-4 98
-4 484
-4 150
-4 224
-4 66
-4 128
-4 146
-4 406
-4 374
-4 152
-4 82
-4 166
-4 430
-4 252
-4 292
-4 338
-4 446
-4 394
-4 482
-4 174
-4 494
-4 466
-4 208
-4 174
-4 396
-4 162
-4 266
-4 342
-4 0
-4 128
-4 316
-4 302
-4 438
-4 170
-4 20
-4 378
-4 92
-4 72
-4 4
-4 280
-4 208
-4 356
-4 382
-4 498
-4 386
-4 192
-4 286
-4 176
-4 54
-4 138
-4 216
-4 430
-4 278
-4 176
-4 318
-4 332
-4 180
-4 284
-4 12
-4 230
-4 260
-4 404
-4 384
-4 272
-4 138
-4 84
-4 348
-4 466
-4 58
-4 8
-4 230
-4 208
-4 348
-4 24
-4 172
-4 42
-4 158
-4 496
-4 0
-4 322
-4 468
-4 454
-4 100
-4 298
-4 418
-4 96
-4 26
-4 230
-4 120
-4 404
-4 436
-4 156
-4 468
-4 308
-4 196
-4 288
-4 98
-4 282
-4 318
-4 318
-4 470
-4 316
-4 0
-4 490
-4 364
-4 118
-4 134
-4 282
-4 138
-4 238
-4 118
-4 72
-4 90
-4 10
-4 306
-4 224
-4 242
-4 392
-4 272
-4 242
-4 452
-4 226
-4 402
-4 396
-4 58
-4 336
-4 168
-4 34
-4 472
-4 322
-4 498
-4 160
-4 42
-4 430
-4 458
-4 78
-4 76
-4 492
-4 218
-4 228
-4 138
-4 30
-4 64
-4 468
-4 76
-4 74
-4 342
-4 230
-4 368
-4 296
-4 216
-4 344
-4 274
-4 116
-4 256
-4 70
-4 480
-4 288
-4 244
-4 438
-4 128
-4 432
-4 202
-4 316
-4 280
-4 2
-4 80
-4 44
-4 104
-4 466
-4 366
-4 406
-4 190
-4 406
-4 114
-4 258
-4 90
-4 262
-4 348
-4 424
-4 12
-4 396
-4 164
-4 454
-4 478
-4 298
-4 164
-4 424
-4 382
-4 70
-4 480
-4 24
-4 104
-4 70
-4 438
-4 414
-4 200
-4 360
-4 248
-4 444
-4 120
-4 230
-4 478
-4 178
-4 468
-4 310
-4 460
-4 480
-4 136
-4 172
-4 214
-4 462
-4 406
-4 454
-4 384
-4 256
-4 26
-4 134
-4 384
-4 18
-4 462
-4 492
-4 100
-4 298
-4 498
-4 146
-4 458
-4 362
-4 186
-4 348
-4 18
-4 344
-4 84
-4 28
-4 448
-4 152
-4 348
-4 194
-4 414
-4 222
-4 126
-4 90
-4 400
-4 200
-PREHOOK: query: select count(*) from (select key from srcpart where key % 2 = 0) t
-PREHOOK: type: QUERY
-PREHOOK: Input: default@srcpart
-PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
-PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
-PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
-PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
-#### A masked pattern was here ####
-POSTHOOK: query: select count(*) from (select key from srcpart where key % 2 = 0) t
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@srcpart
-POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=11
-POSTHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
-POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
-POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
-#### A masked pattern was here ####
-988
http://git-wip-us.apache.org/repos/asf/hive/blob/0afaa8f6/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out b/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out
deleted file mode 100644
index c8ebe88..0000000
--- a/ql/src/test/results/clientpositive/tez/udf_get_splits.q.out
+++ /dev/null
@@ -1,73 +0,0 @@
-PREHOOK: query: DESCRIBE FUNCTION get_splits
-PREHOOK: type: DESCFUNCTION
-POSTHOOK: query: DESCRIBE FUNCTION get_splits
-POSTHOOK: type: DESCFUNCTION
-get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string.
-PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
-PREHOOK: type: DESCFUNCTION
-POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
-POSTHOOK: type: DESCFUNCTION
-get_splits(string,int) - Returns an array of length int serialized splits for the referenced tables string.
-PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: database:default
-PREHOOK: Output: default@table_74b4da448d74412d8fc0da37a405efb3
-POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@table_74b4da448d74412d8fc0da37a405efb3
-PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: database:default
-PREHOOK: Output: default@table_4c8d9e53ea514617a6a72158c9c843c5
-POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@table_4c8d9e53ea514617a6a72158c9c843c5
-PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: database:default
-PREHOOK: Output: default@table_42dcc08d004e411f8038701980b491e3
-POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@table_42dcc08d004e411f8038701980b491e3
-PREHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: database:default
-PREHOOK: Output: default@table_9ef6460ac4a24e4fab1ea09ad94b01e3
-POSTHOOK: query: DESCRIBE FUNCTION EXTENDED get_splits
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@table_9ef6460ac4a24e4fab1ea09ad94b01e3
-PREHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t
-PREHOOK: type: QUERY
-PREHOOK: Input: default@src
-PREHOOK: Output: database:default
-PREHOOK: Output: default@table_bf88922034334ab08a9abb6cf8aa546e
-POSTHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@src
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@table_bf88922034334ab08a9abb6cf8aa546e
-PREHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t
-PREHOOK: type: QUERY
-PREHOOK: Input: _dummy_database@_dummy_table
-#### A masked pattern was here ####
-POSTHOOK: query: select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t
-POSTHOOK: type: QUERY
-POSTHOOK: Input: _dummy_database@_dummy_table
-#### A masked pattern was here ####
-class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit -1434872849 218
-class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit 2107621793 218
-class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit -1988206222 218
-class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit 1357774915 218
-class org.apache.hadoop.mapred.TextInputFormat class org.apache.hadoop.mapred.FileSplit 605302265 218