You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:24:56 UTC
[06/39] hive git commit: HIVE-13162: Fixes for LlapDump and
FileSinkoperator
HIVE-13162: Fixes for LlapDump and FileSinkoperator
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57761e34
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57761e34
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57761e34
Branch: refs/heads/master
Commit: 57761e34ffec80487a0e678bea6d2dc87bfc9b11
Parents: 7c4b3ff
Author: Gunther Hagleitner <gu...@apache.org>
Authored: Thu Feb 25 16:54:52 2016 -0800
Committer: Gunther Hagleitner <gu...@apache.org>
Committed: Thu Feb 25 17:20:38 2016 -0800
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 4 +-
.../org/apache/hadoop/hive/ql/QTestUtil.java | 3 +-
.../src/java/org/apache/hive/jdbc/LlapDump.java | 16 +-
.../org/apache/hive/jdbc/LlapInputFormat.java | 4 +-
.../hadoop/hive/llap/LlapInputFormat.java | 56 +-
.../llap/daemon/impl/ContainerRunnerImpl.java | 10 +-
.../ext/LlapTaskUmbilicalExternalClient.java | 8 -
.../llap/tezplugins/LlapTaskCommunicator.java | 2 +-
.../tezplugins/LlapTaskSchedulerService.java | 4 +-
.../hadoop/hive/llap/LlapInputFormat.java | 159 --
.../hive/llap/LlapOutputFormatService.java | 77 +-
.../hadoop/hive/llap/LlapRecordWriter.java | 10 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 13 +
.../hadoop/hive/ql/exec/FunctionRegistry.java | 4 +-
.../hive/ql/exec/tez/HiveSplitGenerator.java | 1 +
.../hive/ql/exec/tez/MapRecordProcessor.java | 4 +-
.../hive/ql/io/HivePassThroughRecordWriter.java | 4 +
.../ql/udf/generic/GenericUDFGetSplits.java | 412 ----
.../udf/generic/GenericUDTFExecuteSplits.java | 124 +
.../ql/udf/generic/GenericUDTFGetSplits.java | 420 ++++
.../org/apache/tez/dag/api/TaskSpecBuilder.java | 4 +-
.../hadoop/hive/llap/TestLlapOutputFormat.java | 62 +-
.../queries/clientpositive/udf_get_splits.q | 6 -
.../queries/clientpositive/udtf_get_splits.q | 43 +
.../clientpositive/llap/udtf_get_splits.q.out | 2130 ++++++++++++++++++
25 files changed, 2874 insertions(+), 706 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index deb9905..13b5113 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -435,7 +435,6 @@ minitez.query.files=bucket_map_join_tez1.q,\
tez_smb_main.q,\
tez_smb_1.q,\
tez_smb_empty.q,\
- udf_get_splits.q,\
vector_join_part_col_char.q,\
vectorized_dynamic_partition_pruning.q,\
tez_multi_union.q,\
@@ -495,7 +494,8 @@ minillap.query.files=bucket_map_join_tez1.q,\
vectorized_dynamic_partition_pruning.q,\
tez_multi_union.q,\
tez_join.q,\
- tez_union_multiinsert.q
+ tez_union_multiinsert.q,\
+ udtf_get_splits.q
encrypted.query.files=encryption_join_unencrypted_tbl.q,\
encryption_insert_partition_static.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 5e81e98..e524bd2 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -1460,7 +1460,8 @@ public class QTestUtil {
".*Input:.*/data/files/.*",
".*Output:.*/data/files/.*",
".*total number of created files now is.*",
- ".*.hive-staging.*"
+ ".*.hive-staging.*",
+ "table_.*"
});
private final Pattern[] partialReservedPlanMask = toPattern(new String[] {
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
index a807f6c..7ed0a0e 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException;
import java.io.FileInputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@@ -61,6 +62,7 @@ public class LlapDump {
private static String user = "hive";
private static String pwd = "";
private static String query = "select * from test";
+ private static String numSplits = "1";
public static void main(String[] args) throws Exception {
Options opts = createOptions();
@@ -84,6 +86,10 @@ public class LlapDump {
pwd = cli.getOptionValue("p");
}
+ if (cli.hasOption('n')) {
+ numSplits = cli.getOptionValue("n");
+ }
+
if (cli.getArgs().length > 0) {
query = cli.getArgs()[0];
}
@@ -95,7 +101,7 @@ public class LlapDump {
LlapInputFormat format = new LlapInputFormat(url, user, pwd, query);
JobConf job = new JobConf();
- InputSplit[] splits = format.getSplits(job, 1);
+ InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits));
if (splits.length == 0) {
System.out.println("No splits returned - empty scan");
@@ -104,6 +110,7 @@ public class LlapDump {
boolean first = true;
for (InputSplit s: splits) {
+ LOG.info("Processing input split s from " + Arrays.toString(s.getLocations()));
RecordReader<NullWritable, Text> reader = format.getRecordReader(s, job, null);
if (reader instanceof LlapRecordReader && first) {
@@ -122,6 +129,7 @@ public class LlapDump {
System.out.println(value);
}
}
+ System.exit(0);
}
}
@@ -146,6 +154,12 @@ public class LlapDump {
.hasArg()
.create('p'));
+ result.addOption(OptionBuilder
+ .withLongOpt("num")
+ .withDescription("number of splits")
+ .hasArg()
+ .create('n'));
+
return result;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
index 5af2175..9a7c16d 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
@@ -64,6 +64,8 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
public final String USER_KEY = "llap.if.user";
public final String PWD_KEY = "llap.if.pwd";
+ public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
+
private Connection con;
private Statement stmt;
@@ -105,7 +107,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
try {
con = DriverManager.getConnection(url,user,pwd);
stmt = con.createStatement();
- String sql = "select r.if_class as ic, r.split_class as sc, r.split as s from (select explode(get_splits(\""+query+"\","+numSplits+")) as r) t";
+ String sql = String.format(SPLIT_QUERY, query, numSplits);
ResultSet res = stmt.executeQuery(sql);
while (res.next()) {
// deserialize split
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
index d8066d5..b32d662 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -67,7 +67,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
*/
@Override
public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
+ Reporter reporter) throws IOException {
LlapInputSplit llapSplit = (LlapInputSplit) split;
SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
@@ -75,22 +75,15 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
// TODO HACK: Spark is built with Hive-1.2.1, does not have access to HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT
int llapSubmitPort = job.getInt("hive.llap.daemon.rpc.port", 15001);
- LOG.info("ZZZ: DBG: Starting LlapTaskUmbilicalExternalClient");
-
LlapTaskUmbilicalExternalClient llapClient =
- new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
- submitWorkInfo.getToken());
+ new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
+ submitWorkInfo.getToken());
llapClient.init(job);
llapClient.start();
- LOG.info("ZZZ: DBG: Crated LlapClient");
- // TODO KKK Shutdown the llap client.
-
SubmitWorkRequestProto submitWorkRequestProto =
- constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
- llapClient.getAddress(), submitWorkInfo.getToken());
-
- LOG.info("ZZZ: DBG: Created submitWorkRequest for: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
+ constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
+ llapClient.getAddress(), submitWorkInfo.getToken());
TezEvent tezEvent = new TezEvent();
DataInputBuffer dib = new DataInputBuffer();
@@ -116,7 +109,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
socket.getOutputStream().write(0);
socket.getOutputStream().flush();
- LOG.debug("Registered id: " + id);
+ LOG.info("Registered id: " + id);
return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
}
@@ -127,16 +120,16 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
}
private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
- int taskNum,
- InetSocketAddress address,
- Token<JobTokenIdentifier> token) throws
- IOException {
+ int taskNum,
+ InetSocketAddress address,
+ Token<JobTokenIdentifier> token) throws
+ IOException {
TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
ApplicationId appId = submitWorkInfo.getFakeAppId();
SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
// This works, assuming the executor is running within YARN.
- LOG.info("DBG: Setting user in submitWorkRequest to: " +
+ LOG.info("Setting user in submitWorkRequest to: " +
System.getenv(ApplicationConstants.Environment.USER.name()));
builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
builder.setApplicationIdString(appId.toString());
@@ -144,10 +137,9 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
builder.setTokenIdentifier(appId.toString());
ContainerId containerId =
- ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
+ ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
builder.setContainerIdString(containerId.toString());
-
builder.setAmHost(address.getHostName());
builder.setAmPort(address.getPort());
Credentials taskCredentials = new Credentials();
@@ -155,18 +147,18 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
// TODO Figure out where credentials will come from. Normally Hive sets up
// URLs on the tez dag, for which Tez acquires credentials.
-// taskCredentials.addAll(getContext().getCredentials());
-
-// Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
-// taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
-// ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
-// if (credentialsBinary == null) {
-// credentialsBinary = serializeCredentials(getContext().getCredentials());
-// credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
-// } else {
-// credentialsBinary = credentialsBinary.duplicate();
-// }
-// builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+ // taskCredentials.addAll(getContext().getCredentials());
+
+ // Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+ // taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+ // ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
+ // if (credentialsBinary == null) {
+ // credentialsBinary = serializeCredentials(getContext().getCredentials());
+ // credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
+ // } else {
+ // credentialsBinary = credentialsBinary.duplicate();
+ // }
+ // builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
Credentials credentials = new Credentials();
TokenCache.setSessionToken(token, credentials);
ByteBuffer credentialsBinary = serializeCredentials(credentials);
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index e80fb15..d47355a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -265,10 +265,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
List<QueryFragmentInfo> knownFragments =
queryTracker
.queryComplete(queryIdentifier, request.getDeleteDelay());
- LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
+ LOG.info("Pending fragment count for completed query {} = {}", queryIdentifier,
knownFragments.size());
for (QueryFragmentInfo fragmentInfo : knownFragments) {
- LOG.info("DBG: Issuing killFragment for completed query {} {}", queryIdentifier,
+ LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier,
fragmentInfo.getFragmentIdentifierString());
executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
}
@@ -277,7 +277,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
@Override
public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) {
- LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString());
+ LOG.info("Received terminateFragment request for {}", request.getFragmentIdentifierString());
executorService.killFragment(request.getFragmentIdentifierString());
return TerminateFragmentResponseProto.getDefaultInstance();
}
@@ -356,10 +356,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
LOG.info("Processing query failed notification for {}", queryIdentifier);
List<QueryFragmentInfo> knownFragments =
queryTracker.queryComplete(queryIdentifier, -1);
- LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier,
+ LOG.info("Pending fragment count for failed query {} = {}", queryIdentifier,
knownFragments.size());
for (QueryFragmentInfo fragmentInfo : knownFragments) {
- LOG.info("DBG: Issuing killFragment for failed query {} {}", queryIdentifier,
+ LOG.info("Issuing killFragment for failed query {} {}", queryIdentifier,
fragmentInfo.getFragmentIdentifierString());
executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 4305682..16cfd01 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -88,12 +88,6 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
-
- LOG.warn("ZZZ: DBG: " + " Submitting fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " on host: " + llapHost + ", port=" + llapPort);
-// LOG.info("ZZZ: DBG: " + " Complete SubmitWorkRequest: " + submitWorkRequestProto);
-// submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()
-
- LOG.info("ZZZ: DBG: Received {} events for {}", tezEvents.size(), submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
// Register the pending events to be sent for this spec.
pendingEvents.putIfAbsent(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), tezEvents);
@@ -109,7 +103,6 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
return;
}
}
- LOG.info("DBG: Submitted " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
}
@Override
@@ -166,7 +159,6 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
TezHeartbeatResponse response = new TezHeartbeatResponse();
// Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
- LOG.info("ZZZ: DBG: Received heartbeat from taskAttemptId: " + taskAttemptId.toString());
List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString());
if (tezEvents == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 76d095a..91e4323 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -349,7 +349,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId,
boolean invokedByContainerEnd) {
LOG.info(
- "DBG: Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}",
+ "Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}",
taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd");
LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(taskAttemptId);
// NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 3bca0da..e1ad12d 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -931,7 +931,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
} else {
// No tasks qualify as preemptable
- LOG.info("DBG: No tasks qualify as killable to schedule tasks at priority {}", forPriority);
+ LOG.info("No tasks qualify as killable to schedule tasks at priority {}", forPriority);
break;
}
}
@@ -941,7 +941,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// Send out the preempted request outside of the lock.
if (preemptedTaskList != null) {
for (TaskInfo taskInfo : preemptedTaskList) {
- LOG.info("DBG: Preempting task {}", taskInfo);
+ LOG.info("Preempting task {}", taskInfo);
getContext().preemptContainer(taskInfo.containerId);
// Preemption will finally be registered as a deallocateTask as a result of preemptContainer
// That resets preemption info and allows additional tasks to be pre-empted if required.
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
deleted file mode 100644
index d308ec8..0000000
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-import javax.security.auth.login.LoginException;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TaskSpecBuilder;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.Socket;
-
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-
-import com.esotericsoftware.kryo.Kryo;
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
-import java.io.InputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.io.FileNotFoundException;
-import java.util.UUID;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.runtime.api.Event;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
-import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputInitializer;
-import org.apache.tez.runtime.api.InputInitializerContext;
-import org.apache.tez.runtime.api.InputSpecUpdate;
-import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
-
-import com.google.common.base.Preconditions;
-
-public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
-
- private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
-
- private final TezWork work;
- private final Schema schema;
-
- public LlapInputFormat(TezWork tezWork, Schema schema) {
- this.work = tezWork;
- this.schema = schema;
- }
-
- // need empty constructor for bean instantiation
- public LlapInputFormat() {
- // None of these fields should be required during getRecordReader,
- // and should not be read.
- work = null;
- schema = null;
- }
-
- /*
- * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
- * off the work in the split to LLAP and finally return the connected socket back in an
- * LlapRecordReader. The LlapRecordReader class reads the results from the socket.
- */
- @Override
- public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
-
- // Calls a static method to ensure none of the object fields are read.
- return _getRecordReader(split, job, reporter);
- }
-
- private static RecordReader _getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws
- IOException {
- LlapInputSplit llapSplit = (LlapInputSplit)split;
-
- // TODO: push event into LLAP
-
- // this is just the portion that sets up the io to receive data
- String host = split.getLocations()[0];
-
- // TODO: need to construct id here. Format is queryId + "_" + taskIndex
- String id = "foobar";
-
- HiveConf conf = new HiveConf();
- Socket socket = new Socket(host,
- conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
-
- LOG.debug("Socket connected");
-
- socket.getOutputStream().write(id.getBytes());
- socket.getOutputStream().write(0);
- socket.getOutputStream().flush();
-
- LOG.debug("Registered id: " + id);
-
- return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
- }
-
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
- throw new IOException("These are not the splits you are looking for.");
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 4f38ff1..a197d7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -81,45 +81,45 @@ public class LlapOutputFormatService {
public void start() throws IOException {
executor.submit(new Runnable() {
- byte[] buffer = new byte[4096];
- @Override
- public void run() {
- while (true) {
- Socket s = null;
- try {
- s = socket.accept();
- String id = readId(s);
- LOG.debug("Received: "+id);
- registerReader(s, id);
- } catch (IOException io) {
- if (s != null) {
- try{
- s.close();
- } catch (IOException io2) {
- // ignore
- }
- }
- }
- }
- }
+ byte[] buffer = new byte[4096];
+ @Override
+ public void run() {
+ while (true) {
+ Socket s = null;
+ try {
+ s = socket.accept();
+ String id = readId(s);
+ LOG.debug("Received: "+id);
+ registerReader(s, id);
+ } catch (IOException io) {
+ if (s != null) {
+ try{
+ s.close();
+ } catch (IOException io2) {
+ // ignore
+ }
+ }
+ }
+ }
+ }
- private String readId(Socket s) throws IOException {
- InputStream in = s.getInputStream();
- int idx = 0;
- while((buffer[idx++] = (byte)in.read()) != '\0') {}
- return new String(buffer,0,idx-1);
- }
+ private String readId(Socket s) throws IOException {
+ InputStream in = s.getInputStream();
+ int idx = 0;
+ while((buffer[idx++] = (byte)in.read()) != '\0') {}
+ return new String(buffer,0,idx-1);
+ }
- private void registerReader(Socket s, String id) throws IOException {
- synchronized(service) {
- LOG.debug("registering socket for: "+id);
- LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream());
- writers.put(id, writer);
- service.notifyAll();
+ private void registerReader(Socket s, String id) throws IOException {
+ synchronized(service) {
+ LOG.debug("registering socket for: "+id);
+ LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream());
+ writers.put(id, writer);
+ service.notifyAll();
+ }
+ }
}
- }
- }
- );
+ );
}
public void stop() throws IOException, InterruptedException {
@@ -132,10 +132,11 @@ public class LlapOutputFormatService {
RecordWriter writer = null;
synchronized(service) {
while ((writer = writers.get(id)) == null) {
- LOG.debug("Waiting for writer for: "+id);
- service.wait();
+ LOG.info("Waiting for writer for: "+id);
+ service.wait();
}
}
+ LOG.info("Returning writer for: "+id);
return writer;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
index 4d1996c..b632fae 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
@@ -20,19 +20,18 @@ package org.apache.hadoop.hive.llap;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.DataOutputStream;;
+import java.io.DataOutputStream;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class LlapRecordWriter<K extends Writable, V extends WritableComparable>
implements RecordWriter<K,V> {
+ public static final Logger LOG = LoggerFactory.getLogger(LlapRecordWriter.class);
DataOutputStream dos;
@@ -42,6 +41,7 @@ public class LlapRecordWriter<K extends Writable, V extends WritableComparable>
@Override
public void close(Reporter reporter) throws IOException {
+ LOG.info("CLOSING the record writer output stream");
dos.close();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 0899793..02439be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -36,11 +36,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -202,6 +204,17 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
}
try {
+ if ("org.apache.hadoop.hive.llap.LlapStorageHandler".equals(getConf().getTableInfo().getProperties().
+ get(hive_metastoreConstants.META_TABLE_STORAGE))) {
+ (new LlapOutputFormat())
+ .getRecordWriter(null,
+ hconf instanceof JobConf ? (JobConf) hconf : new JobConf(hconf), null, null)
+ .close(null);
+ }
+ } catch (IOException e) {
+ // ignored
+ }
+ try {
for (int i = 0; i < updaters.length; i++) {
if (updaters[i] != null) {
updaters[i].close(abort);
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index f3afa24..c782466 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -344,8 +344,6 @@ public final class FunctionRegistry {
system.registerGenericUDF("ewah_bitmap_or", GenericUDFEWAHBitmapOr.class);
system.registerGenericUDF("ewah_bitmap_empty", GenericUDFEWAHBitmapEmpty.class);
- system.registerGenericUDF("get_splits", GenericUDFGetSplits.class);
-
// Aliases for Java Class Names
// These are used in getImplicitConvertUDFMethod
system.registerUDF(serdeConstants.BOOLEAN_TYPE_NAME, UDFToBoolean.class, false, UDFToBoolean.class.getSimpleName());
@@ -444,6 +442,8 @@ public final class FunctionRegistry {
system.registerGenericUDTF("parse_url_tuple", GenericUDTFParseUrlTuple.class);
system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class);
system.registerGenericUDTF("stack", GenericUDTFStack.class);
+ system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class);
+ system.registerGenericUDTF("execute_splits", GenericUDTFExecuteSplits.class);
//PTF declarations
system.registerGenericUDF(LEAD_FUNC_NAME, GenericUDFLead.class);
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 011e459..b16368f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -92,6 +92,7 @@ public class HiveSplitGenerator extends InputInitializer {
this.conf = conf;
this.work = work;
+ this.jobConf = new JobConf(conf);
// TODO RSHACK - assuming grouping enabled always.
userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build();
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 3fe70ab..7a3d6a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -95,7 +95,9 @@ public class MapRecordProcessor extends RecordProcessor {
super(jconf, context);
String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
if (LlapProxy.isDaemon()) { // do not cache plan
- jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, queryId + "_" + context.getTaskIndex());
+ String id = queryId + "_" + context.getTaskIndex();
+ l4j.info("LLAP_OF_ID: "+id);
+ jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, id);
cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
} else {
cache = ObjectCacheFactory.getCache(jconf, queryId);
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
index 454c321..6d00a0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
@@ -23,11 +23,14 @@ import java.io.IOException;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HivePassThroughRecordWriter <K extends WritableComparable<?>, V extends Writable>
implements RecordWriter {
+ public static final Logger LOG = LoggerFactory.getLogger(HivePassThroughRecordWriter.class);
private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
public HivePassThroughRecordWriter(org.apache.hadoop.mapred.RecordWriter<K, V> writer) {
@@ -42,6 +45,7 @@ implements RecordWriter {
@Override
public void close(boolean abort) throws IOException {
+ LOG.info("Closing the pass through writer.");
//close with null reporter
mWriter.close(null);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
deleted file mode 100644
index f69dea3..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.udf.generic;
-
-import javax.security.auth.login.LoginException;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.google.common.base.Preconditions;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.LlapInputSplit;
-import org.apache.hadoop.hive.llap.LlapOutputFormat;
-import org.apache.hadoop.hive.llap.SubmitWorkInfo;
-import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.Description;
-import org.apache.hadoop.hive.ql.exec.MapredContext;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
-import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
-import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.udf.UDFType;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.TaskSpecBuilder;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * GenericUDFGetSplits.
- *
- */
-@Description(name = "get_splits", value = "_FUNC_(string,int) - "
- + "Returns an array of length int serialized splits for the referenced tables string.")
-@UDFType(deterministic = false)
-public class GenericUDFGetSplits extends GenericUDF {
-
- private static final Logger LOG = LoggerFactory.getLogger(GenericUDFGetSplits.class);
-
- private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat";
-
- private transient StringObjectInspector stringOI;
- private transient IntObjectInspector intOI;
- private final ArrayList<Object> retArray = new ArrayList<Object>();
- private transient JobConf jc;
- private transient Hive db;
- private ByteArrayOutputStream bos;
- private DataOutput dos;
-
- @Override
- public ObjectInspector initialize(ObjectInspector[] arguments)
- throws UDFArgumentException {
-
- LOG.debug("initializing GenericUDFGetSplits");
-
- try {
- if (SessionState.get() != null && SessionState.get().getConf() != null) {
- HiveConf conf = SessionState.get().getConf();
- jc = DagUtils.getInstance().createConfiguration(conf);
- db = Hive.get(conf);
- } else {
- jc = MapredContext.get().getJobConf();
- db = Hive.get();
- }
- } catch(Exception e) {
- LOG.error("Failed to initialize: ",e);
- throw new UDFArgumentException(e);
- }
-
- LOG.debug("Initialized conf, jc and metastore connection");
-
- if (arguments.length != 2) {
- throw new UDFArgumentLengthException("The function GET_SPLITS accepts 2 arguments.");
- } else if (!(arguments[0] instanceof StringObjectInspector)) {
- LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
- throw new UDFArgumentTypeException(0, "\""
- + "string\" is expected at function GET_SPLITS, " + "but \""
- + arguments[0].getTypeName() + "\" is found");
- } else if (!(arguments[1] instanceof IntObjectInspector)) {
- LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
- throw new UDFArgumentTypeException(1, "\""
- + "int\" is expected at function GET_SPLITS, " + "but \""
- + arguments[1].getTypeName() + "\" is found");
- }
-
- stringOI = (StringObjectInspector) arguments[0];
- intOI = (IntObjectInspector) arguments[1];
-
- List<String> names = Arrays.asList("if_class","split_class","split");
- List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
- PrimitiveObjectInspectorFactory.javaStringObjectInspector,
- PrimitiveObjectInspectorFactory.javaStringObjectInspector,
- PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
- ObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
- ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(outputOI);
- bos = new ByteArrayOutputStream(1024);
- dos = new DataOutputStream(bos);
-
- LOG.debug("done initializing GenericUDFGetSplits");
- return listOI;
- }
-
- @Override
- public Object evaluate(DeferredObject[] arguments) throws HiveException {
- retArray.clear();
-
- String query = stringOI.getPrimitiveJavaObject(arguments[0].get());
-
- int num = intOI.get(arguments[1].get());
-
- Driver driver = new Driver();
- CommandProcessorResponse cpr;
-
- HiveConf conf = SessionState.get().getConf();
-
- if (conf == null) {
- throw new HiveException("Need configuration");
- }
-
- String fetchTaskConversion = HiveConf.getVar(conf, ConfVars.HIVEFETCHTASKCONVERSION);
- String queryResultFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
-
- try {
- LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.getName()+"\"");
- HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none");
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, "Llap");
-
- cpr = driver.compileAndRespond(query);
- if(cpr.getResponseCode() != 0) {
- throw new HiveException("Failed to compile query: "+cpr.getException());
- }
-
- QueryPlan plan = driver.getPlan();
- List<Task<?>> roots = plan.getRootTasks();
- Schema schema = plan.getResultSchema();
-
- if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
- throw new HiveException("Was expecting a single TezTask.");
- }
-
- TezWork tezWork = ((TezTask)roots.get(0)).getWork();
-
- if (tezWork.getAllWork().size() != 1) {
-
- String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", "");
-
- String ctas = "create temporary table "+tableName+" as "+query;
- LOG.info("CTAS: "+ctas);
-
- try {
- cpr = driver.run(ctas, false);
- } catch(CommandNeedRetryException e) {
- throw new HiveException(e);
- }
-
- if(cpr.getResponseCode() != 0) {
- throw new HiveException("Failed to create temp table: " + cpr.getException());
- }
-
- query = "select * from " + tableName;
- cpr = driver.compileAndRespond(query);
- if(cpr.getResponseCode() != 0) {
- throw new HiveException("Failed to create temp table: "+cpr.getException());
- }
-
- plan = driver.getPlan();
- roots = plan.getRootTasks();
- schema = plan.getResultSchema();
-
- if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
- throw new HiveException("Was expecting a single TezTask.");
- }
-
- tezWork = ((TezTask)roots.get(0)).getWork();
- }
-
- MapWork w = (MapWork)tezWork.getAllWork().get(0);
-
- try {
- for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
- Object[] os = new Object[3];
- os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME;
- os[1] = s.getClass().getName();
- bos.reset();
- s.write(dos);
- byte[] frozen = bos.toByteArray();
- os[2] = frozen;
- retArray.add(os);
- }
- } catch(Exception e) {
- throw new HiveException(e);
- }
- } finally {
- HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, fetchTaskConversion);
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, queryResultFormat);
- }
- return retArray;
- }
-
- public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) throws IOException {
- DAG dag = DAG.create(work.getName());
- dag.setCredentials(job.getCredentials());
- // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag);
-
- DagUtils utils = DagUtils.getInstance();
- Context ctx = new Context(job);
- MapWork mapWork = (MapWork) work.getAllWork().get(0);
- // bunch of things get setup in the context based on conf but we need only the MR tmp directory
- // for the following method.
- JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
- Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
- FileSystem fs = scratchDir.getFileSystem(job);
- try {
- LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
- Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
- new ArrayList<LocalResource>(), fs, ctx, false, work,
- work.getVertexType(mapWork));
- String vertexName = wx.getName();
- dag.addVertex(wx);
- utils.addCredentials(mapWork, dag);
-
-
- // we have the dag now proceed to get the splits:
- HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
- Preconditions.checkState(HiveConf.getBoolVar(wxConf,
- HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
- Preconditions.checkState(HiveConf.getBoolVar(wxConf,
- HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
- splitGenerator.initializeSplitGenerator(wxConf, mapWork);
- List<Event> eventList = splitGenerator.initialize();
-
- // hack - just serializing with kryo for now. This needs to be done properly
- InputSplit[] result = new InputSplit[eventList.size() - 1];
- DataOutputBuffer dob = new DataOutputBuffer();
-
- InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0);
-
- List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints();
-
- Preconditions.checkState(hints.size() == eventList.size() -1);
-
- LOG.error("DBG: NumEvents=" + eventList.size());
- LOG.error("DBG: NumSplits=" + result.length);
-
- ApplicationId fakeApplicationId = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0);
- TaskSpec taskSpec =
- new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1, fakeApplicationId);
-
- SubmitWorkInfo submitWorkInfo =
- new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis());
- EventMetaData sourceMetaData =
- new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName,
- "NULL_VERTEX", null);
- EventMetaData destinationMetaInfo = new TaskSpecBuilder().getDestingationMetaData(wx);
-
- LOG.info("DBG: Number of splits: " + (eventList.size() - 1));
- for (int i = 0; i < eventList.size() - 1; i++) {
- // Creating the TezEvent here itself, since it's easy to serialize.
- Event event = eventList.get(i + 1);
- TaskLocationHint hint = hints.get(i);
- Set<String> hosts = hint.getHosts();
- LOG.info("DBG: Using locations: " + hosts.toString());
- if (hosts.size() != 1) {
- LOG.warn("DBG: Bad # of locations: " + hosts.size());
- }
- SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()];
-
- int j = 0;
- for (String host : hosts) {
- locations[j++] = new SplitLocationInfo(host, false);
- }
- TezEvent tezEvent = new TezEvent(event, sourceMetaData, System.currentTimeMillis());
- tezEvent.setDestinationInfo(destinationMetaInfo);
-
- bos.reset();
- dob.reset();
- tezEvent.write(dob);
-
- byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
-
- result[i] =
- new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema);
- }
- return result;
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
- /**
- * Returns a local resource representing a jar. This resource will be used to execute the plan on
- * the cluster.
- *
- * @param localJarPath
- * Local path to the jar to be localized.
- * @return LocalResource corresponding to the localized hive exec resource.
- * @throws IOException
- * when any file system related call fails.
- * @throws LoginException
- * when we are unable to determine the user.
- * @throws URISyntaxException
- * when current jar location cannot be determined.
- */
- private LocalResource createJarLocalResource(String localJarPath, DagUtils utils,
- Configuration conf)
- throws IOException, LoginException, IllegalArgumentException, FileNotFoundException {
- FileStatus destDirStatus = utils.getHiveJarDirectory(conf);
- assert destDirStatus != null;
- Path destDirPath = destDirStatus.getPath();
-
- Path localFile = new Path(localJarPath);
- String sha = getSha(localFile, conf);
-
- String destFileName = localFile.getName();
-
- // Now, try to find the file based on SHA and name. Currently we require exact name match.
- // We could also allow cutting off versions and other stuff provided that SHA matches...
- destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
- + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
-
- // TODO: if this method is ever called on more than one jar, getting the dir and the
- // list need to be refactored out to be done only once.
- Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
- return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
- }
-
- private String getSha(Path localFile, Configuration conf)
- throws IOException, IllegalArgumentException {
- InputStream is = null;
- try {
- FileSystem localFs = FileSystem.getLocal(conf);
- is = localFs.open(localFile);
- return DigestUtils.sha256Hex(is);
- } finally {
- if (is != null) {
- is.close();
- }
- }
- }
-
- @Override
- public String getDisplayString(String[] children) {
- assert children.length == 2;
- return getStandardDisplayString("get_splits", children);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java
new file mode 100644
index 0000000..12759ab
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFGetSplits.PlanFragment;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GenericUDTFExecuteSplits.
+ *
+ */
+@Description(name = "execute_splits", value = "_FUNC_(string,int) - "
+ + "Returns an array of length int serialized splits for the referenced tables string.")
+@UDFType(deterministic = false)
+public class GenericUDTFExecuteSplits extends GenericUDTFGetSplits {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFExecuteSplits.class);
+
+ @Override
+ public StructObjectInspector initialize(ObjectInspector[] arguments)
+ throws UDFArgumentException {
+
+ LOG.debug("initializing ExecuteSplits");
+
+ if (SessionState.get() == null || SessionState.get().getConf() == null) {
+ throw new IllegalStateException("Cannot run execute splits outside HS2");
+ }
+
+ if (arguments.length != 2) {
+ throw new UDFArgumentLengthException("The function execute_splits accepts 2 arguments.");
+ } else if (!(arguments[0] instanceof StringObjectInspector)) {
+ LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
+ throw new UDFArgumentTypeException(0, "\""
+ + "string\" is expected at function execute_splits, " + "but \""
+ + arguments[0].getTypeName() + "\" is found");
+ } else if (!(arguments[1] instanceof IntObjectInspector)) {
+ LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
+ throw new UDFArgumentTypeException(1, "\""
+ + "int\" is expected at function execute_splits, " + "but \""
+ + arguments[1].getTypeName() + "\" is found");
+ }
+
+ stringOI = (StringObjectInspector) arguments[0];
+ intOI = (IntObjectInspector) arguments[1];
+
+ List<String> names = Arrays.asList("split_num","value");
+ List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
+ PrimitiveObjectInspectorFactory.javaIntObjectInspector,
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+ StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
+
+ LOG.debug("done initializing GenericUDTFExecuteSplits");
+ return outputOI;
+ }
+
+ @Override
+ public void process(Object[] arguments) throws HiveException {
+
+ String query = stringOI.getPrimitiveJavaObject(arguments[0]);
+ int num = intOI.get(arguments[1]);
+
+ PlanFragment fragment = createPlanFragment(query, num);
+ try {
+ InputFormat<NullWritable, Text> format = (InputFormat<NullWritable,Text>)(Class.forName("org.apache.hadoop.hive.llap.LlapInputFormat").newInstance());
+ int index = 0;
+ for (InputSplit s: getSplits(jc, num, fragment.work, fragment.schema)) {
+ RecordReader<NullWritable, Text> reader = format.getRecordReader(s,fragment.jc,null);
+ Text value = reader.createValue();
+ NullWritable key = reader.createKey();
+ index++;
+ while(reader.next(key,value)) {
+ Object[] os = new Object[2];
+ os[0] = index;
+ os[1] = value.toString();
+ forward(os);
+ }
+ }
+ } catch(Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ @Override
+ public void close() throws HiveException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
new file mode 100644
index 0000000..ebb0ca5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
+import org.apache.hadoop.hive.llap.SubmitWorkInfo;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TaskSpecBuilder;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * GenericUDTFGetSplits.
+ *
+ */
+@Description(name = "get_splits", value = "_FUNC_(string,int) - "
+ + "Returns an array of length int serialized splits for the referenced tables string.")
+@UDFType(deterministic = false)
+public class GenericUDTFGetSplits extends GenericUDTF {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFGetSplits.class);
+
+ private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat";
+
+ protected transient StringObjectInspector stringOI;
+ protected transient IntObjectInspector intOI;
+ protected transient JobConf jc;
+ private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+ private DataOutput dos = new DataOutputStream(bos);
+
+ @Override
+ public StructObjectInspector initialize(ObjectInspector[] arguments)
+ throws UDFArgumentException {
+
+ LOG.debug("initializing GenericUDFGetSplits");
+
+ if (SessionState.get() == null || SessionState.get().getConf() == null) {
+ throw new IllegalStateException("Cannot run get splits outside HS2");
+ }
+
+ LOG.debug("Initialized conf, jc and metastore connection");
+
+ if (arguments.length != 2) {
+ throw new UDFArgumentLengthException("The function GET_SPLITS accepts 2 arguments.");
+ } else if (!(arguments[0] instanceof StringObjectInspector)) {
+ LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
+ throw new UDFArgumentTypeException(0, "\""
+ + "string\" is expected at function GET_SPLITS, " + "but \""
+ + arguments[0].getTypeName() + "\" is found");
+ } else if (!(arguments[1] instanceof IntObjectInspector)) {
+ LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
+ throw new UDFArgumentTypeException(1, "\""
+ + "int\" is expected at function GET_SPLITS, " + "but \""
+ + arguments[1].getTypeName() + "\" is found");
+ }
+
+ stringOI = (StringObjectInspector) arguments[0];
+ intOI = (IntObjectInspector) arguments[1];
+
+ List<String> names = Arrays.asList("if_class","split_class","split");
+ List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+ PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
+ StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
+
+ LOG.debug("done initializing GenericUDFGetSplits");
+ return outputOI;
+ }
+
+ public static class PlanFragment {
+ public JobConf jc;
+ public TezWork work;
+ public Schema schema;
+
+ public PlanFragment(TezWork work, Schema schema, JobConf jc) {
+ this.work = work;
+ this.schema = schema;
+ this.jc = jc;
+ }
+ }
+
+ @Override
+ public void process(Object[] arguments) throws HiveException {
+
+ String query = stringOI.getPrimitiveJavaObject(arguments[0]);
+ int num = intOI.get(arguments[1]);
+
+ PlanFragment fragment = createPlanFragment(query, num);
+ TezWork tezWork = fragment.work;
+ Schema schema = fragment.schema;
+
+ try {
+ for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
+ Object[] os = new Object[3];
+ os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME;
+ os[1] = s.getClass().getName();
+ bos.reset();
+ s.write(dos);
+ byte[] frozen = bos.toByteArray();
+ os[2] = frozen;
+ forward(os);
+ }
+ } catch(Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ public PlanFragment createPlanFragment(String query, int num) throws HiveException {
+
+ HiveConf conf = new HiveConf(SessionState.get().getConf());
+ HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none");
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, "Llap");
+
+ String originalMode = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, "llap");
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS, true);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS, true);
+
+ try {
+ jc = DagUtils.getInstance().createConfiguration(conf);
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
+
+ Driver driver = new Driver(conf);
+ CommandProcessorResponse cpr;
+
+ LOG.info("setting fetch.task.conversion to none and query file format to \""
+ + LlapOutputFormat.class.getName()+"\"");
+
+ cpr = driver.compileAndRespond(query);
+ if(cpr.getResponseCode() != 0) {
+ throw new HiveException("Failed to compile query: "+cpr.getException());
+ }
+
+ QueryPlan plan = driver.getPlan();
+ List<Task<?>> roots = plan.getRootTasks();
+ Schema schema = plan.getResultSchema();
+
+ if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
+ throw new HiveException("Was expecting a single TezTask.");
+ }
+
+ TezWork tezWork = ((TezTask)roots.get(0)).getWork();
+
+ if (tezWork.getAllWork().size() != 1) {
+
+ String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", "");
+
+ String ctas = "create temporary table "+tableName+" as "+query;
+ LOG.info("CTAS: "+ctas);
+
+ try {
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, originalMode);
+ cpr = driver.run(ctas, false);
+ } catch(CommandNeedRetryException e) {
+ throw new HiveException(e);
+ }
+
+ if(cpr.getResponseCode() != 0) {
+ throw new HiveException("Failed to create temp table: " + cpr.getException());
+ }
+
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, "llap");
+ query = "select * from " + tableName;
+ cpr = driver.compileAndRespond(query);
+ if(cpr.getResponseCode() != 0) {
+ throw new HiveException("Failed to create temp table: "+cpr.getException());
+ }
+
+ plan = driver.getPlan();
+ roots = plan.getRootTasks();
+ schema = plan.getResultSchema();
+
+ if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
+ throw new HiveException("Was expecting a single TezTask.");
+ }
+
+ tezWork = ((TezTask)roots.get(0)).getWork();
+ }
+
+ return new PlanFragment(tezWork, schema, jc);
+ }
+
+ public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema)
+ throws IOException {
+
+ DAG dag = DAG.create(work.getName());
+ dag.setCredentials(job.getCredentials());
+ // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag);
+
+ DagUtils utils = DagUtils.getInstance();
+ Context ctx = new Context(job);
+ MapWork mapWork = (MapWork) work.getAllWork().get(0);
+ // bunch of things get setup in the context based on conf but we need only the MR tmp directory
+ // for the following method.
+ JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
+ Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
+ FileSystem fs = scratchDir.getFileSystem(job);
+ try {
+ LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
+ Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
+ new ArrayList<LocalResource>(), fs, ctx, false, work,
+ work.getVertexType(mapWork));
+ String vertexName = wx.getName();
+ dag.addVertex(wx);
+ utils.addCredentials(mapWork, dag);
+
+
+ // we have the dag now proceed to get the splits:
+ HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
+ Preconditions.checkState(HiveConf.getBoolVar(wxConf,
+ HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
+ Preconditions.checkState(HiveConf.getBoolVar(wxConf,
+ HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
+ splitGenerator.initializeSplitGenerator(wxConf, mapWork);
+ List<Event> eventList = splitGenerator.initialize();
+
+ // hack - just serializing with kryo for now. This needs to be done properly
+ InputSplit[] result = new InputSplit[eventList.size() - 1];
+ DataOutputBuffer dob = new DataOutputBuffer();
+
+ InputConfigureVertexTasksEvent configureEvent
+ = (InputConfigureVertexTasksEvent) eventList.get(0);
+
+ List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints();
+
+ Preconditions.checkState(hints.size() == eventList.size() - 1);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("NumEvents=" + eventList.size());
+ LOG.debug("NumSplits=" + result.length);
+ }
+
+ ApplicationId fakeApplicationId
+ = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0);
+
+ LOG.info("Number of splits: " + (eventList.size() - 1));
+ for (int i = 0; i < eventList.size() - 1; i++) {
+
+ TaskSpec taskSpec =
+ new TaskSpecBuilder().constructTaskSpec(dag, vertexName,
+ eventList.size() - 1, fakeApplicationId, i);
+
+ SubmitWorkInfo submitWorkInfo =
+ new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis());
+ EventMetaData sourceMetaData =
+ new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName,
+ "NULL_VERTEX", null);
+ EventMetaData destinationMetaInfo = new TaskSpecBuilder().getDestingationMetaData(wx);
+
+ // Creating the TezEvent here itself, since it's easy to serialize.
+ Event event = eventList.get(i + 1);
+ TaskLocationHint hint = hints.get(i);
+ Set<String> hosts = hint.getHosts();
+ if (hosts.size() != 1) {
+ LOG.warn("Bad # of locations: " + hosts.size());
+ }
+ SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()];
+
+ int j = 0;
+ for (String host : hosts) {
+ locations[j++] = new SplitLocationInfo(host, false);
+ }
+ TezEvent tezEvent = new TezEvent(event, sourceMetaData, System.currentTimeMillis());
+ tezEvent.setDestinationInfo(destinationMetaInfo);
+
+ bos.reset();
+ dob.reset();
+ tezEvent.write(dob);
+
+ byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
+
+ result[i] = new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema);
+ }
+ return result;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Returns a local resource representing a jar. This resource will be used to execute the plan on
+ * the cluster.
+ *
+ * @param localJarPath
+ * Local path to the jar to be localized.
+ * @return LocalResource corresponding to the localized hive exec resource.
+ * @throws IOException
+ * when any file system related call fails.
+ * @throws LoginException
+ * when we are unable to determine the user.
+ * @throws URISyntaxException
+ * when current jar location cannot be determined.
+ */
+ private LocalResource createJarLocalResource(String localJarPath, DagUtils utils,
+ Configuration conf)
+ throws IOException, LoginException, IllegalArgumentException, FileNotFoundException {
+ FileStatus destDirStatus = utils.getHiveJarDirectory(conf);
+ assert destDirStatus != null;
+ Path destDirPath = destDirStatus.getPath();
+
+ Path localFile = new Path(localJarPath);
+ String sha = getSha(localFile, conf);
+
+ String destFileName = localFile.getName();
+
+ // Now, try to find the file based on SHA and name. Currently we require exact name match.
+ // We could also allow cutting off versions and other stuff provided that SHA matches...
+ destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
+ + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
+
+ // TODO: if this method is ever called on more than one jar, getting the dir and the
+ // list need to be refactored out to be done only once.
+ Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
+ return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
+ }
+
+ private String getSha(Path localFile, Configuration conf)
+ throws IOException, IllegalArgumentException {
+ InputStream is = null;
+ try {
+ FileSystem localFs = FileSystem.getLocal(conf);
+ is = localFs.open(localFile);
+ return DigestUtils.sha256Hex(is);
+ } finally {
+ if (is != null) {
+ is.close();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws HiveException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
index 5cabb6a..5db8c48 100644
--- a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
+++ b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
@@ -17,7 +17,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
// Proxy class within the tez.api package to access package private methods.
public class TaskSpecBuilder {
- public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits, ApplicationId appId) {
+ public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits, ApplicationId appId, int index) {
Vertex vertex = dag.getVertex(vertexName);
ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs =
@@ -43,7 +43,7 @@ public class TaskSpecBuilder {
TezDAGID dagId = TezDAGID.getInstance(appId, 0);
TezVertexID vertexId = TezVertexID.getInstance(dagId, 0);
- TezTaskID taskId = TezTaskID.getInstance(vertexId, 0);
+ TezTaskID taskId = TezTaskID.getInstance(vertexId, index);
TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
return new TaskSpec(taskAttemptId, dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs, outputSpecs, null);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index c49231c..7b516fe 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -79,46 +79,52 @@ public class TestLlapOutputFormat {
@Test
public void testValues() throws Exception {
JobConf job = new JobConf();
- job.set(LlapOutputFormat.LLAP_OF_ID_KEY, "foobar");
- LlapOutputFormat format = new LlapOutputFormat();
- HiveConf conf = new HiveConf();
- Socket socket = new Socket("localhost",
- conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
+ for (int k = 0; k < 5; ++k) {
+ String id = "foobar"+k;
+ job.set(LlapOutputFormat.LLAP_OF_ID_KEY, id);
+ LlapOutputFormat format = new LlapOutputFormat();
- LOG.debug("Socket connected");
+ HiveConf conf = new HiveConf();
+ Socket socket = new Socket("localhost",
+ conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
- socket.getOutputStream().write("foobar".getBytes());
- socket.getOutputStream().write(0);
- socket.getOutputStream().flush();
+ LOG.debug("Socket connected");
- Thread.sleep(3000);
+ socket.getOutputStream().write(id.getBytes());
+ socket.getOutputStream().write(0);
+ socket.getOutputStream().flush();
- LOG.debug("Data written");
+ Thread.sleep(3000);
- RecordWriter<NullWritable, Text> writer = format.getRecordWriter(null, job, null, null);
- Text text = new Text();
+ LOG.debug("Data written");
- LOG.debug("Have record writer");
+ RecordWriter<NullWritable, Text> writer = format.getRecordWriter(null, job, null, null);
+ Text text = new Text();
- for (int i = 0; i < 10; ++i) {
- text.set(""+i);
- writer.write(NullWritable.get(),text);
- }
+ LOG.debug("Have record writer");
- writer.close(null);
+ for (int i = 0; i < 10; ++i) {
+ text.set(""+i);
+ writer.write(NullWritable.get(),text);
+ }
- InputStream in = socket.getInputStream();
- RecordReader reader = new LlapRecordReader(in, null, Text.class);
+ writer.close(null);
- LOG.debug("Have record reader");
+ InputStream in = socket.getInputStream();
+ RecordReader reader = new LlapRecordReader(in, null, Text.class);
- int count = 0;
- while(reader.next(NullWritable.get(), text)) {
- LOG.debug(text.toString());
- count++;
- }
+ LOG.debug("Have record reader");
+
+ int count = 0;
+ while(reader.next(NullWritable.get(), text)) {
+ LOG.debug(text.toString());
+ count++;
+ }
- Assert.assertEquals(count,10);
+ reader.close();
+
+ Assert.assertEquals(count,10);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/test/queries/clientpositive/udf_get_splits.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/udf_get_splits.q b/ql/src/test/queries/clientpositive/udf_get_splits.q
deleted file mode 100644
index 70400e8..0000000
--- a/ql/src/test/queries/clientpositive/udf_get_splits.q
+++ /dev/null
@@ -1,6 +0,0 @@
-set hive.fetch.task.conversion=more;
-
-DESCRIBE FUNCTION get_splits;
-DESCRIBE FUNCTION EXTENDED get_splits;
-
-select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t;
http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/test/queries/clientpositive/udtf_get_splits.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/udtf_get_splits.q b/ql/src/test/queries/clientpositive/udtf_get_splits.q
new file mode 100644
index 0000000..f378dca
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/udtf_get_splits.q
@@ -0,0 +1,43 @@
+set hive.fetch.task.conversion=more;
+set hive.mapred.mode=nonstrict;
+set mapred.max.split.size=100;
+set mapred.min.split.size.per.node=100;
+set mapred.min.split.size.per.rack=100;
+set mapred.max.split.size=100;
+set tez.grouping.max-size=100;
+set tez.grouping.min-size=100;
+
+DESCRIBE FUNCTION get_splits;
+DESCRIBE FUNCTION execute_splits;
+
+select r1, r2, floor(length(r3)/100000)
+from
+ (select
+ get_splits(
+ "select key, count(*) from srcpart where key % 2 = 0 group by key",
+ 5) as (r1, r2, r3)) t;
+
+select r1, r2, floor(length(r3)/100000)
+from
+ (select
+ get_splits(
+ "select key from srcpart where key % 2 = 0",
+ 5) as (r1, r2, r3)) t;
+
+show tables;
+
+select r1, r2
+from
+ (select
+ execute_splits(
+ "select key from srcpart where key % 2 = 0",
+ 1) as (r1, r2)) t;
+
+select r1, r2
+from
+ (select
+ execute_splits(
+ "select key from srcpart where key % 2 = 0",
+ 5) as (r1, r2)) t;
+
+select count(*) from (select key from srcpart where key % 2 = 0) t;