You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:24:56 UTC

[06/39] hive git commit: HIVE-13162: Fixes for LlapDump and FileSinkoperator

HIVE-13162: Fixes for LlapDump and FileSinkoperator


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/57761e34
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/57761e34
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/57761e34

Branch: refs/heads/master
Commit: 57761e34ffec80487a0e678bea6d2dc87bfc9b11
Parents: 7c4b3ff
Author: Gunther Hagleitner <gu...@apache.org>
Authored: Thu Feb 25 16:54:52 2016 -0800
Committer: Gunther Hagleitner <gu...@apache.org>
Committed: Thu Feb 25 17:20:38 2016 -0800

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |    4 +-
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |    3 +-
 .../src/java/org/apache/hive/jdbc/LlapDump.java |   16 +-
 .../org/apache/hive/jdbc/LlapInputFormat.java   |    4 +-
 .../hadoop/hive/llap/LlapInputFormat.java       |   56 +-
 .../llap/daemon/impl/ContainerRunnerImpl.java   |   10 +-
 .../ext/LlapTaskUmbilicalExternalClient.java    |    8 -
 .../llap/tezplugins/LlapTaskCommunicator.java   |    2 +-
 .../tezplugins/LlapTaskSchedulerService.java    |    4 +-
 .../hadoop/hive/llap/LlapInputFormat.java       |  159 --
 .../hive/llap/LlapOutputFormatService.java      |   77 +-
 .../hadoop/hive/llap/LlapRecordWriter.java      |   10 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |   13 +
 .../hadoop/hive/ql/exec/FunctionRegistry.java   |    4 +-
 .../hive/ql/exec/tez/HiveSplitGenerator.java    |    1 +
 .../hive/ql/exec/tez/MapRecordProcessor.java    |    4 +-
 .../hive/ql/io/HivePassThroughRecordWriter.java |    4 +
 .../ql/udf/generic/GenericUDFGetSplits.java     |  412 ----
 .../udf/generic/GenericUDTFExecuteSplits.java   |  124 +
 .../ql/udf/generic/GenericUDTFGetSplits.java    |  420 ++++
 .../org/apache/tez/dag/api/TaskSpecBuilder.java |    4 +-
 .../hadoop/hive/llap/TestLlapOutputFormat.java  |   62 +-
 .../queries/clientpositive/udf_get_splits.q     |    6 -
 .../queries/clientpositive/udtf_get_splits.q    |   43 +
 .../clientpositive/llap/udtf_get_splits.q.out   | 2130 ++++++++++++++++++
 25 files changed, 2874 insertions(+), 706 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index deb9905..13b5113 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -435,7 +435,6 @@ minitez.query.files=bucket_map_join_tez1.q,\
   tez_smb_main.q,\
   tez_smb_1.q,\
   tez_smb_empty.q,\
-  udf_get_splits.q,\
   vector_join_part_col_char.q,\
   vectorized_dynamic_partition_pruning.q,\
   tez_multi_union.q,\
@@ -495,7 +494,8 @@ minillap.query.files=bucket_map_join_tez1.q,\
   vectorized_dynamic_partition_pruning.q,\
   tez_multi_union.q,\
   tez_join.q,\
-  tez_union_multiinsert.q
+  tez_union_multiinsert.q,\
+  udtf_get_splits.q
 
 encrypted.query.files=encryption_join_unencrypted_tbl.q,\
   encryption_insert_partition_static.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 5e81e98..e524bd2 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -1460,7 +1460,8 @@ public class QTestUtil {
       ".*Input:.*/data/files/.*",
       ".*Output:.*/data/files/.*",
       ".*total number of created files now is.*",
-      ".*.hive-staging.*"
+      ".*.hive-staging.*",
+      "table_.*"
   });
 
   private final Pattern[] partialReservedPlanMask = toPattern(new String[] {

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
index a807f6c..7ed0a0e 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapDump.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.FileInputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -61,6 +62,7 @@ public class LlapDump {
   private static String user = "hive";
   private static String pwd = "";
   private static String query = "select * from test";
+  private static String numSplits = "1";
 
   public static void main(String[] args) throws Exception {
     Options opts = createOptions();
@@ -84,6 +86,10 @@ public class LlapDump {
       pwd = cli.getOptionValue("p");
     }
 
+    if (cli.hasOption('n')) {
+      numSplits = cli.getOptionValue("n");
+    }
+
     if (cli.getArgs().length > 0) {
       query = cli.getArgs()[0];
     }
@@ -95,7 +101,7 @@ public class LlapDump {
     LlapInputFormat format = new LlapInputFormat(url, user, pwd, query);
     JobConf job = new JobConf();
 
-    InputSplit[] splits = format.getSplits(job, 1);
+    InputSplit[] splits = format.getSplits(job, Integer.parseInt(numSplits));
 
     if (splits.length == 0) {
       System.out.println("No splits returned - empty scan");
@@ -104,6 +110,7 @@ public class LlapDump {
       boolean first = true;
 
       for (InputSplit s: splits) {
+        LOG.info("Processing input split s from " + Arrays.toString(s.getLocations()));
         RecordReader<NullWritable, Text> reader = format.getRecordReader(s, job, null);
 
         if (reader instanceof LlapRecordReader && first) {
@@ -122,6 +129,7 @@ public class LlapDump {
           System.out.println(value);
         }
       }
+      System.exit(0);
     }
   }
 
@@ -146,6 +154,12 @@ public class LlapDump {
         .hasArg()
         .create('p'));
 
+    result.addOption(OptionBuilder
+        .withLongOpt("num")
+        .withDescription("number of splits")
+        .hasArg()
+        .create('n'));
+
     return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
index 5af2175..9a7c16d 100644
--- a/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
+++ b/jdbc/src/java/org/apache/hive/jdbc/LlapInputFormat.java
@@ -64,6 +64,8 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
   public final String USER_KEY = "llap.if.user";
   public final String PWD_KEY = "llap.if.pwd";
 
+  public final String SPLIT_QUERY = "select get_splits(\"%s\",%d)";
+
   private Connection con;
   private Statement stmt;
 
@@ -105,7 +107,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     try {
       con = DriverManager.getConnection(url,user,pwd);
       stmt = con.createStatement();
-      String sql = "select r.if_class as ic, r.split_class as sc, r.split as s from (select explode(get_splits(\""+query+"\","+numSplits+")) as r) t";
+      String sql = String.format(SPLIT_QUERY, query, numSplits);
       ResultSet res = stmt.executeQuery(sql);
       while (res.next()) {
         // deserialize split

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
index d8066d5..b32d662 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -67,7 +67,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
    */
   @Override
   public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job,
-                                                       Reporter reporter) throws IOException {
+      Reporter reporter) throws IOException {
 
     LlapInputSplit llapSplit = (LlapInputSplit) split;
     SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
@@ -75,22 +75,15 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     // TODO HACK: Spark is built with Hive-1.2.1, does not have access to HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT
     int llapSubmitPort = job.getInt("hive.llap.daemon.rpc.port", 15001);
 
-    LOG.info("ZZZ: DBG: Starting LlapTaskUmbilicalExternalClient");
-
     LlapTaskUmbilicalExternalClient llapClient =
-        new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
-            submitWorkInfo.getToken());
+      new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
+          submitWorkInfo.getToken());
     llapClient.init(job);
     llapClient.start();
 
-    LOG.info("ZZZ: DBG: Crated LlapClient");
-    // TODO KKK Shutdown the llap client.
-
     SubmitWorkRequestProto submitWorkRequestProto =
-        constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
-            llapClient.getAddress(), submitWorkInfo.getToken());
-
-    LOG.info("ZZZ: DBG: Created submitWorkRequest for: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
+      constructSubmitWorkRequestProto(submitWorkInfo, llapSplit.getSplitNum(),
+          llapClient.getAddress(), submitWorkInfo.getToken());
 
     TezEvent tezEvent = new TezEvent();
     DataInputBuffer dib = new DataInputBuffer();
@@ -116,7 +109,7 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     socket.getOutputStream().write(0);
     socket.getOutputStream().flush();
 
-    LOG.debug("Registered id: " + id);
+    LOG.info("Registered id: " + id);
 
     return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
   }
@@ -127,16 +120,16 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
   }
 
   private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
-                                                                 int taskNum,
-                                                                 InetSocketAddress address,
-                                                                 Token<JobTokenIdentifier> token) throws
-      IOException {
+      int taskNum,
+      InetSocketAddress address,
+      Token<JobTokenIdentifier> token) throws
+        IOException {
     TaskSpec taskSpec = submitWorkInfo.getTaskSpec();
     ApplicationId appId = submitWorkInfo.getFakeAppId();
 
     SubmitWorkRequestProto.Builder builder = SubmitWorkRequestProto.newBuilder();
     // This works, assuming the executor is running within YARN.
-    LOG.info("DBG: Setting user in submitWorkRequest to: " +
+    LOG.info("Setting user in submitWorkRequest to: " +
         System.getenv(ApplicationConstants.Environment.USER.name()));
     builder.setUser(System.getenv(ApplicationConstants.Environment.USER.name()));
     builder.setApplicationIdString(appId.toString());
@@ -144,10 +137,9 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     builder.setTokenIdentifier(appId.toString());
 
     ContainerId containerId =
-        ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
+      ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum);
     builder.setContainerIdString(containerId.toString());
 
-
     builder.setAmHost(address.getHostName());
     builder.setAmPort(address.getPort());
     Credentials taskCredentials = new Credentials();
@@ -155,18 +147,18 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
     // TODO Figure out where credentials will come from. Normally Hive sets up
     // URLs on the tez dag, for which Tez acquires credentials.
 
-//    taskCredentials.addAll(getContext().getCredentials());
-
-//    Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
-//        taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
-//    ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
-//    if (credentialsBinary == null) {
-//      credentialsBinary = serializeCredentials(getContext().getCredentials());
-//      credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
-//    } else {
-//      credentialsBinary = credentialsBinary.duplicate();
-//    }
-//    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
+    //    taskCredentials.addAll(getContext().getCredentials());
+
+    //    Preconditions.checkState(currentQueryIdentifierProto.getDagIdentifier() ==
+    //        taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
+    //    ByteBuffer credentialsBinary = credentialMap.get(currentQueryIdentifierProto);
+    //    if (credentialsBinary == null) {
+    //      credentialsBinary = serializeCredentials(getContext().getCredentials());
+    //      credentialMap.putIfAbsent(currentQueryIdentifierProto, credentialsBinary.duplicate());
+    //    } else {
+    //      credentialsBinary = credentialsBinary.duplicate();
+    //    }
+    //    builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
     Credentials credentials = new Credentials();
     TokenCache.setSessionToken(token, credentials);
     ByteBuffer credentialsBinary = serializeCredentials(credentials);

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index e80fb15..d47355a 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -265,10 +265,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     List<QueryFragmentInfo> knownFragments =
         queryTracker
             .queryComplete(queryIdentifier, request.getDeleteDelay());
-    LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier,
+    LOG.info("Pending fragment count for completed query {} = {}", queryIdentifier,
         knownFragments.size());
     for (QueryFragmentInfo fragmentInfo : knownFragments) {
-      LOG.info("DBG: Issuing killFragment for completed query {} {}", queryIdentifier,
+      LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier,
           fragmentInfo.getFragmentIdentifierString());
       executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
     }
@@ -277,7 +277,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
 
   @Override
   public TerminateFragmentResponseProto terminateFragment(TerminateFragmentRequestProto request) {
-    LOG.info("DBG: Received terminateFragment request for {}", request.getFragmentIdentifierString());
+    LOG.info("Received terminateFragment request for {}", request.getFragmentIdentifierString());
     executorService.killFragment(request.getFragmentIdentifierString());
     return TerminateFragmentResponseProto.getDefaultInstance();
   }
@@ -356,10 +356,10 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
     LOG.info("Processing query failed notification for {}", queryIdentifier);
     List<QueryFragmentInfo> knownFragments =
         queryTracker.queryComplete(queryIdentifier, -1);
-    LOG.info("DBG: Pending fragment count for failed query {} = {}", queryIdentifier,
+    LOG.info("Pending fragment count for failed query {} = {}", queryIdentifier,
         knownFragments.size());
     for (QueryFragmentInfo fragmentInfo : knownFragments) {
-      LOG.info("DBG: Issuing killFragment for failed query {} {}", queryIdentifier,
+      LOG.info("Issuing killFragment for failed query {} {}", queryIdentifier,
           fragmentInfo.getFragmentIdentifierString());
       executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 4305682..16cfd01 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -88,12 +88,6 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
   public void submitWork(final SubmitWorkRequestProto submitWorkRequestProto, String llapHost, int llapPort, List<TezEvent> tezEvents) {
     Preconditions.checkArgument(submitWorkRequestProto.getUsingTezAm() == false);
 
-
-    LOG.warn("ZZZ: DBG: " + " Submitting fragment: " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString() + " on host: " + llapHost + ", port=" + llapPort);
-//    LOG.info("ZZZ: DBG: " + " Complete SubmitWorkRequest: " + submitWorkRequestProto);
-//    submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString()
-
-    LOG.info("ZZZ: DBG: Received {} events for {}", tezEvents.size(), submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
     // Register the pending events to be sent for this spec.
     pendingEvents.putIfAbsent(submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString(), tezEvents);
 
@@ -109,7 +103,6 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
                 return;
               }
             }
-            LOG.info("DBG: Submitted " + submitWorkRequestProto.getFragmentSpec().getFragmentIdentifierString());
           }
 
           @Override
@@ -166,7 +159,6 @@ public class LlapTaskUmbilicalExternalClient extends AbstractService {
       TezHeartbeatResponse response = new TezHeartbeatResponse();
       // Assuming TaskAttemptId and FragmentIdentifierString are the same. Verify this.
       TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
-      LOG.info("ZZZ: DBG: Received heartbeat from taskAttemptId: " + taskAttemptId.toString());
 
       List<TezEvent> tezEvents = pendingEvents.remove(taskAttemptId.toString());
       if (tezEvents == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 76d095a..91e4323 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -349,7 +349,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
   private void sendTaskTerminated(final TezTaskAttemptID taskAttemptId,
                                   boolean invokedByContainerEnd) {
     LOG.info(
-        "DBG: Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}",
+        "Attempting to send terminateRequest for fragment {} due to internal preemption invoked by {}",
         taskAttemptId.toString(), invokedByContainerEnd ? "containerEnd" : "taskEnd");
     LlapNodeId nodeId = entityTracker.getNodeIdForTaskAttempt(taskAttemptId);
     // NodeId can be null if the task gets unregistered due to failure / being killed by the daemon itself

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 3bca0da..e1ad12d 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -931,7 +931,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           }
         } else {
           // No tasks qualify as preemptable
-          LOG.info("DBG: No tasks qualify as killable to schedule tasks at priority {}", forPriority);
+          LOG.info("No tasks qualify as killable to schedule tasks at priority {}", forPriority);
           break;
         }
       }
@@ -941,7 +941,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     // Send out the preempted request outside of the lock.
     if (preemptedTaskList != null) {
       for (TaskInfo taskInfo : preemptedTaskList) {
-        LOG.info("DBG: Preempting task {}", taskInfo);
+        LOG.info("Preempting task {}", taskInfo);
         getContext().preemptContainer(taskInfo.containerId);
         // Preemption will finally be registered as a deallocateTask as a result of preemptContainer
         // That resets preemption info and allows additional tasks to be pre-empted if required.

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
deleted file mode 100644
index d308ec8..0000000
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.llap;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-import javax.security.auth.login.LoginException;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.tez.TezProcessor;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TaskSpecBuilder;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.Socket;
-
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-
-import com.esotericsoftware.kryo.Kryo;
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
-import java.io.InputStream;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.io.FileNotFoundException;
-import java.util.UUID;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.runtime.api.Event;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
-import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.VertexLocationHint;
-import org.apache.tez.dag.api.event.VertexStateUpdate;
-import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.InputInitializer;
-import org.apache.tez.runtime.api.InputInitializerContext;
-import org.apache.tez.runtime.api.InputSpecUpdate;
-import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.events.InputDataInformationEvent;
-import org.apache.tez.runtime.api.events.InputInitializerEvent;
-
-import com.google.common.base.Preconditions;
-
-public class LlapInputFormat<V extends WritableComparable> implements InputFormat<NullWritable, V> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(LlapInputFormat.class);
-
-  private final TezWork work;
-  private final Schema schema;
-
-  public LlapInputFormat(TezWork tezWork, Schema schema) {
-    this.work = tezWork;
-    this.schema = schema;
-  }
-
-  // need empty constructor for bean instantiation
-  public LlapInputFormat() {
-    // None of these fields should be required during getRecordReader,
-    // and should not be read.
-    work = null;
-    schema = null;
-  }
-
-  /*
-   * This proxy record reader has the duty of establishing a connected socket with LLAP, then fire
-   * off the work in the split to LLAP and finally return the connected socket back in an
-   * LlapRecordReader. The LlapRecordReader class reads the results from the socket.
-   */
-  @Override
-  public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
-
-    // Calls a static method to ensure none of the object fields are read.
-    return _getRecordReader(split, job, reporter);
-  }
-
-  private static RecordReader _getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws
-      IOException {
-    LlapInputSplit llapSplit = (LlapInputSplit)split;
-
-    // TODO: push event into LLAP
-
-    // this is just the portion that sets up the io to receive data
-    String host = split.getLocations()[0];
-
-    // TODO: need to construct id here. Format is queryId + "_" + taskIndex
-    String id = "foobar";
-
-    HiveConf conf = new HiveConf();
-    Socket socket = new Socket(host,
-        conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
-
-    LOG.debug("Socket connected");
-
-    socket.getOutputStream().write(id.getBytes());
-    socket.getOutputStream().write(0);
-    socket.getOutputStream().flush();
-
-    LOG.debug("Registered id: " + id);
-
-    return new LlapRecordReader(socket.getInputStream(), llapSplit.getSchema(), Text.class);
-  }
-
-  @Override
-  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-    throw new IOException("These are not the splits you are looking for.");
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 4f38ff1..a197d7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -81,45 +81,45 @@ public class LlapOutputFormatService {
 
   public void start() throws IOException {
     executor.submit(new Runnable() {
-      byte[] buffer = new byte[4096];
-      @Override
-      public void run() {
-	while (true) {
-	  Socket s = null;
-	  try {
-	    s = socket.accept();
-	    String id = readId(s);
-	    LOG.debug("Received: "+id);
-	    registerReader(s, id);
-	  } catch (IOException io) {
-	    if (s != null) {
-	      try{
-		s.close();
-	      } catch (IOException io2) {
-		// ignore
-	      }
-	    }
-	  }
-	}
-      }
+        byte[] buffer = new byte[4096];
+        @Override
+        public void run() {
+          while (true) {
+            Socket s = null;
+            try {
+              s = socket.accept();
+              String id = readId(s);
+              LOG.debug("Received: "+id);
+              registerReader(s, id);
+            } catch (IOException io) {
+              if (s != null) {
+                try{
+                  s.close();
+                } catch (IOException io2) {
+                  // ignore
+                }
+              }
+            }
+          }
+        }
 
-    private String readId(Socket s) throws IOException {
-      InputStream in = s.getInputStream();
-      int idx = 0;
-      while((buffer[idx++] = (byte)in.read()) != '\0') {}
-      return new String(buffer,0,idx-1);
-    }
+        private String readId(Socket s) throws IOException {
+          InputStream in = s.getInputStream();
+          int idx = 0;
+          while((buffer[idx++] = (byte)in.read()) != '\0') {}
+          return new String(buffer,0,idx-1);
+        }
 
-    private void registerReader(Socket s, String id) throws IOException {
-      synchronized(service) {
-	LOG.debug("registering socket for: "+id);
-	LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream());
-        writers.put(id, writer);
-        service.notifyAll();
+        private void registerReader(Socket s, String id) throws IOException {
+          synchronized(service) {
+            LOG.debug("registering socket for: "+id);
+            LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream());
+            writers.put(id, writer);
+            service.notifyAll();
+          }
+        }
       }
-    }
-    }
-    );
+      );
   }
 
   public void stop() throws IOException, InterruptedException {
@@ -132,10 +132,11 @@ public class LlapOutputFormatService {
     RecordWriter writer = null;
     synchronized(service) {
       while ((writer = writers.get(id)) == null) {
-	LOG.debug("Waiting for writer for: "+id);
-	service.wait();
+        LOG.info("Waiting for writer for: "+id);
+        service.wait();
       }
     }
+    LOG.info("Returning writer for: "+id);
     return writer;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
index 4d1996c..b632fae 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
@@ -20,19 +20,18 @@ package org.apache.hadoop.hive.llap;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.DataOutputStream;;
+import java.io.DataOutputStream;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LlapRecordWriter<K extends Writable, V extends WritableComparable>
   implements RecordWriter<K,V> {
+  public static final Logger LOG = LoggerFactory.getLogger(LlapRecordWriter.class);
 
   DataOutputStream dos;
 
@@ -42,6 +41,7 @@ public class LlapRecordWriter<K extends Writable, V extends WritableComparable>
 
   @Override
   public void close(Reporter reporter) throws IOException {
+    LOG.info("CLOSING the record writer output stream");
     dos.close();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 0899793..02439be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -36,11 +36,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -202,6 +204,17 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         }
       }
       try {
+        if ("org.apache.hadoop.hive.llap.LlapStorageHandler".equals(getConf().getTableInfo().getProperties().
+            get(hive_metastoreConstants.META_TABLE_STORAGE))) {
+          (new LlapOutputFormat())
+              .getRecordWriter(null,
+                  hconf instanceof JobConf ? (JobConf) hconf : new JobConf(hconf), null, null)
+              .close(null);
+        }
+      } catch (IOException e) {
+        // ignored
+      }
+      try {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {
             updaters[i].close(abort);

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index f3afa24..c782466 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -344,8 +344,6 @@ public final class FunctionRegistry {
     system.registerGenericUDF("ewah_bitmap_or", GenericUDFEWAHBitmapOr.class);
     system.registerGenericUDF("ewah_bitmap_empty", GenericUDFEWAHBitmapEmpty.class);
 
-    system.registerGenericUDF("get_splits", GenericUDFGetSplits.class);
-
     // Aliases for Java Class Names
     // These are used in getImplicitConvertUDFMethod
     system.registerUDF(serdeConstants.BOOLEAN_TYPE_NAME, UDFToBoolean.class, false, UDFToBoolean.class.getSimpleName());
@@ -444,6 +442,8 @@ public final class FunctionRegistry {
     system.registerGenericUDTF("parse_url_tuple", GenericUDTFParseUrlTuple.class);
     system.registerGenericUDTF("posexplode", GenericUDTFPosExplode.class);
     system.registerGenericUDTF("stack", GenericUDTFStack.class);
+    system.registerGenericUDTF("get_splits", GenericUDTFGetSplits.class);
+    system.registerGenericUDTF("execute_splits", GenericUDTFExecuteSplits.class);
 
     //PTF declarations
     system.registerGenericUDF(LEAD_FUNC_NAME, GenericUDFLead.class);

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
index 011e459..b16368f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
@@ -92,6 +92,7 @@ public class HiveSplitGenerator extends InputInitializer {
 
     this.conf = conf;
     this.work = work;
+    this.jobConf = new JobConf(conf);
 
     // TODO RSHACK - assuming grouping enabled always.
     userPayloadProto = MRInputUserPayloadProto.newBuilder().setGroupingEnabled(true).build();

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index 3fe70ab..7a3d6a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -95,7 +95,9 @@ public class MapRecordProcessor extends RecordProcessor {
     super(jconf, context);
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
     if (LlapProxy.isDaemon()) { // do not cache plan
-      jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, queryId + "_" + context.getTaskIndex());
+      String id = queryId + "_" + context.getTaskIndex();
+      l4j.info("LLAP_OF_ID: "+id);
+      jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, id);
       cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
     } else {
       cache = ObjectCacheFactory.getCache(jconf, queryId);

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
index 454c321..6d00a0a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
@@ -23,11 +23,14 @@ import java.io.IOException;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class HivePassThroughRecordWriter <K extends WritableComparable<?>, V extends Writable>
 implements RecordWriter {
 
+  public static final Logger LOG = LoggerFactory.getLogger(HivePassThroughRecordWriter.class);
   private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
 
   public HivePassThroughRecordWriter(org.apache.hadoop.mapred.RecordWriter<K, V> writer) {
@@ -42,6 +45,7 @@ implements RecordWriter {
 
   @Override
   public void close(boolean abort) throws IOException {
+    LOG.info("Closing the pass through writer.");
     //close with null reporter
     mWriter.close(null);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
deleted file mode 100644
index f69dea3..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDFGetSplits.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.udf.generic;
-
-import javax.security.auth.login.LoginException;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.google.common.base.Preconditions;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.LlapInputSplit;
-import org.apache.hadoop.hive.llap.LlapOutputFormat;
-import org.apache.hadoop.hive.llap.SubmitWorkInfo;
-import org.apache.hadoop.hive.metastore.api.Schema;
-import org.apache.hadoop.hive.ql.CommandNeedRetryException;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.Driver;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.Description;
-import org.apache.hadoop.hive.ql.exec.MapredContext;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
-import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
-import org.apache.hadoop.hive.ql.exec.tez.TezTask;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.udf.UDFType;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SplitLocationInfo;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.TaskSpecBuilder;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.runtime.api.Event;
-import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
-import org.apache.tez.runtime.api.impl.EventMetaData;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * GenericUDFGetSplits.
- *
- */
-@Description(name = "get_splits", value = "_FUNC_(string,int) - "
-    + "Returns an array of length int serialized splits for the referenced tables string.")
-@UDFType(deterministic = false)
-public class GenericUDFGetSplits extends GenericUDF {
-
-  private static final Logger LOG = LoggerFactory.getLogger(GenericUDFGetSplits.class);
-
-  private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat";
-
-  private transient StringObjectInspector stringOI;
-  private transient IntObjectInspector intOI;
-  private final ArrayList<Object> retArray = new ArrayList<Object>();
-  private transient JobConf jc;
-  private transient Hive db;
-  private ByteArrayOutputStream bos;
-  private DataOutput dos;
-
-  @Override
-  public ObjectInspector initialize(ObjectInspector[] arguments)
-    throws UDFArgumentException {
-
-    LOG.debug("initializing GenericUDFGetSplits");
-
-    try {
-      if (SessionState.get() != null && SessionState.get().getConf() != null) {
-        HiveConf conf = SessionState.get().getConf();
-        jc = DagUtils.getInstance().createConfiguration(conf);
-        db = Hive.get(conf);
-      } else {
-        jc = MapredContext.get().getJobConf();
-        db = Hive.get();
-      }
-    } catch(Exception e) {
-      LOG.error("Failed to initialize: ",e);
-      throw new UDFArgumentException(e);
-    }
-
-    LOG.debug("Initialized conf, jc and metastore connection");
-
-    if (arguments.length != 2) {
-      throw new UDFArgumentLengthException("The function GET_SPLITS accepts 2 arguments.");
-    } else if (!(arguments[0] instanceof StringObjectInspector)) {
-      LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
-      throw new UDFArgumentTypeException(0, "\""
-          + "string\" is expected at function GET_SPLITS, " + "but \""
-          + arguments[0].getTypeName() + "\" is found");
-    } else if (!(arguments[1] instanceof IntObjectInspector)) {
-      LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
-      throw new UDFArgumentTypeException(1, "\""
-          + "int\" is expected at function GET_SPLITS, " + "but \""
-          + arguments[1].getTypeName() + "\" is found");
-    }
-
-    stringOI = (StringObjectInspector) arguments[0];
-    intOI = (IntObjectInspector) arguments[1];
-
-    List<String> names = Arrays.asList("if_class","split_class","split");
-    List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
-                                                                    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
-                                                                    PrimitiveObjectInspectorFactory.javaStringObjectInspector,
-                                                                    PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
-    ObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
-    ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(outputOI);
-    bos = new ByteArrayOutputStream(1024);
-    dos = new DataOutputStream(bos);
-
-    LOG.debug("done initializing GenericUDFGetSplits");
-    return listOI;
-  }
-
-  @Override
-  public Object evaluate(DeferredObject[] arguments) throws HiveException {
-    retArray.clear();
-
-    String query = stringOI.getPrimitiveJavaObject(arguments[0].get());
-
-    int num = intOI.get(arguments[1].get());
-
-    Driver driver = new Driver();
-    CommandProcessorResponse cpr;
-
-    HiveConf conf = SessionState.get().getConf();
-
-    if (conf == null) {
-      throw new HiveException("Need configuration");
-    }
-
-    String fetchTaskConversion = HiveConf.getVar(conf, ConfVars.HIVEFETCHTASKCONVERSION);
-    String queryResultFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
-
-    try {
-      LOG.info("setting fetch.task.conversion to none and query file format to \""+LlapOutputFormat.class.getName()+"\"");
-      HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none");
-      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, "Llap");
-
-      cpr = driver.compileAndRespond(query);
-      if(cpr.getResponseCode() != 0) {
-        throw new HiveException("Failed to compile query: "+cpr.getException());
-      }
-
-      QueryPlan plan = driver.getPlan();
-      List<Task<?>> roots = plan.getRootTasks();
-      Schema schema = plan.getResultSchema();
-
-      if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
-        throw new HiveException("Was expecting a single TezTask.");
-      }
-
-      TezWork tezWork = ((TezTask)roots.get(0)).getWork();
-
-      if (tezWork.getAllWork().size() != 1) {
-
-        String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", "");
-
-        String ctas = "create temporary table "+tableName+" as "+query;
-        LOG.info("CTAS: "+ctas);
-
-        try {
-          cpr = driver.run(ctas, false);
-        } catch(CommandNeedRetryException e) {
-          throw new HiveException(e);
-        }
-
-        if(cpr.getResponseCode() != 0) {
-          throw new HiveException("Failed to create temp table: " + cpr.getException());
-        }
-
-        query = "select * from " + tableName;
-        cpr = driver.compileAndRespond(query);
-        if(cpr.getResponseCode() != 0) {
-          throw new HiveException("Failed to create temp table: "+cpr.getException());
-        }
-
-        plan = driver.getPlan();
-        roots = plan.getRootTasks();
-        schema = plan.getResultSchema();
-
-        if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
-          throw new HiveException("Was expecting a single TezTask.");
-        }
-
-        tezWork = ((TezTask)roots.get(0)).getWork();
-      }
-
-      MapWork w = (MapWork)tezWork.getAllWork().get(0);
-
-      try {
-        for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
-          Object[] os = new Object[3];
-          os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME;
-          os[1] = s.getClass().getName();
-          bos.reset();
-          s.write(dos);
-          byte[] frozen = bos.toByteArray();
-          os[2] = frozen;
-          retArray.add(os);
-        }
-      } catch(Exception e) {
-        throw new HiveException(e);
-      }
-    } finally {
-      HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, fetchTaskConversion);
-      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, queryResultFormat);
-    }
-    return retArray;
-  }
-
-  public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema) throws IOException {
-    DAG dag = DAG.create(work.getName());
-    dag.setCredentials(job.getCredentials());
-    // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag);
-
-    DagUtils utils = DagUtils.getInstance();
-    Context ctx = new Context(job);
-    MapWork mapWork = (MapWork) work.getAllWork().get(0);
-    // bunch of things get setup in the context based on conf but we need only the MR tmp directory
-    // for the following method.
-    JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
-    Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
-    FileSystem fs = scratchDir.getFileSystem(job);
-    try {
-      LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
-      Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
-          new ArrayList<LocalResource>(), fs, ctx, false, work,
-          work.getVertexType(mapWork));
-      String vertexName = wx.getName();
-      dag.addVertex(wx);
-      utils.addCredentials(mapWork, dag);
-
-
-      // we have the dag now proceed to get the splits:
-      HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
-      Preconditions.checkState(HiveConf.getBoolVar(wxConf,
-          HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
-      Preconditions.checkState(HiveConf.getBoolVar(wxConf,
-          HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
-      splitGenerator.initializeSplitGenerator(wxConf, mapWork);
-      List<Event> eventList = splitGenerator.initialize();
-
-      // hack - just serializing with kryo for now. This needs to be done properly
-      InputSplit[] result = new InputSplit[eventList.size() - 1];
-      DataOutputBuffer dob = new DataOutputBuffer();
-
-      InputConfigureVertexTasksEvent configureEvent = (InputConfigureVertexTasksEvent) eventList.get(0);
-
-      List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints();
-
-      Preconditions.checkState(hints.size() == eventList.size() -1);
-
-      LOG.error("DBG: NumEvents=" + eventList.size());
-      LOG.error("DBG: NumSplits=" + result.length);
-
-      ApplicationId fakeApplicationId = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0);
-      TaskSpec taskSpec =
-          new TaskSpecBuilder().constructTaskSpec(dag, vertexName, eventList.size() - 1, fakeApplicationId);
-
-      SubmitWorkInfo submitWorkInfo =
-          new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis());
-      EventMetaData sourceMetaData =
-          new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName,
-              "NULL_VERTEX", null);
-      EventMetaData destinationMetaInfo = new TaskSpecBuilder().getDestingationMetaData(wx);
-
-      LOG.info("DBG: Number of splits: " + (eventList.size() - 1));
-      for (int i = 0; i < eventList.size() - 1; i++) {
-        // Creating the TezEvent here itself, since it's easy to serialize.
-        Event event = eventList.get(i + 1);
-        TaskLocationHint hint = hints.get(i);
-        Set<String> hosts = hint.getHosts();
-        LOG.info("DBG: Using locations: " + hosts.toString());
-        if (hosts.size() != 1) {
-          LOG.warn("DBG: Bad # of locations: " + hosts.size());
-        }
-        SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()];
-
-        int j = 0;
-        for (String host : hosts) {
-          locations[j++] = new SplitLocationInfo(host, false);
-        }
-        TezEvent tezEvent = new TezEvent(event, sourceMetaData, System.currentTimeMillis());
-        tezEvent.setDestinationInfo(destinationMetaInfo);
-
-        bos.reset();
-        dob.reset();
-        tezEvent.write(dob);
-
-        byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
-
-        result[i] =
-            new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema);
-      }
-      return result;
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-  }
-
-    /**
-   * Returns a local resource representing a jar. This resource will be used to execute the plan on
-   * the cluster.
-   *
-   * @param localJarPath
-   *          Local path to the jar to be localized.
-   * @return LocalResource corresponding to the localized hive exec resource.
-   * @throws IOException
-   *           when any file system related call fails.
-   * @throws LoginException
-   *           when we are unable to determine the user.
-   * @throws URISyntaxException
-   *           when current jar location cannot be determined.
-   */
-  private LocalResource createJarLocalResource(String localJarPath, DagUtils utils,
-      Configuration conf)
-    throws IOException, LoginException, IllegalArgumentException, FileNotFoundException {
-    FileStatus destDirStatus = utils.getHiveJarDirectory(conf);
-    assert destDirStatus != null;
-    Path destDirPath = destDirStatus.getPath();
-
-    Path localFile = new Path(localJarPath);
-    String sha = getSha(localFile, conf);
-
-    String destFileName = localFile.getName();
-
-    // Now, try to find the file based on SHA and name. Currently we require exact name match.
-    // We could also allow cutting off versions and other stuff provided that SHA matches...
-    destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
-      + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
-
-    // TODO: if this method is ever called on more than one jar, getting the dir and the
-    // list need to be refactored out to be done only once.
-    Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
-    return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
-  }
-
-  private String getSha(Path localFile, Configuration conf)
-    throws IOException, IllegalArgumentException {
-    InputStream is = null;
-    try {
-      FileSystem localFs = FileSystem.getLocal(conf);
-      is = localFs.open(localFile);
-      return DigestUtils.sha256Hex(is);
-    } finally {
-      if (is != null) {
-        is.close();
-      }
-    }
-  }
-
-  @Override
-  public String getDisplayString(String[] children) {
-    assert children.length == 2;
-    return getStandardDisplayString("get_splits", children);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java
new file mode 100644
index 0000000..12759ab
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFExecuteSplits.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDTFGetSplits.PlanFragment;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * GenericUDTFExecuteSplits.
+ *
+ */
+@Description(name = "execute_splits", value = "_FUNC_(string,int) - "
+    + "Returns an array of length int serialized splits for the referenced tables string.")
+@UDFType(deterministic = false)
+public class GenericUDTFExecuteSplits extends GenericUDTFGetSplits {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFExecuteSplits.class);
+
+  @Override
+  public StructObjectInspector initialize(ObjectInspector[] arguments)
+    throws UDFArgumentException {
+
+    LOG.debug("initializing ExecuteSplits");
+
+    if (SessionState.get() == null || SessionState.get().getConf() == null) {
+      throw new IllegalStateException("Cannot run execute splits outside HS2");
+    }
+
+    if (arguments.length != 2) {
+      throw new UDFArgumentLengthException("The function execute_splits accepts 2 arguments.");
+    } else if (!(arguments[0] instanceof StringObjectInspector)) {
+      LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
+      throw new UDFArgumentTypeException(0, "\""
+          + "string\" is expected at function execute_splits, " + "but \""
+          + arguments[0].getTypeName() + "\" is found");
+    } else if (!(arguments[1] instanceof IntObjectInspector)) {
+      LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
+      throw new UDFArgumentTypeException(1, "\""
+          + "int\" is expected at function execute_splits, " + "but \""
+          + arguments[1].getTypeName() + "\" is found");
+    }
+
+    stringOI = (StringObjectInspector) arguments[0];
+    intOI = (IntObjectInspector) arguments[1];
+
+    List<String> names = Arrays.asList("split_num","value");
+    List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
+      PrimitiveObjectInspectorFactory.javaIntObjectInspector,
+      PrimitiveObjectInspectorFactory.javaStringObjectInspector);
+    StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
+
+    LOG.debug("done initializing GenericUDTFExecuteSplits");
+    return outputOI;
+  }
+
+  @Override
+  public void process(Object[] arguments) throws HiveException {
+
+    String query = stringOI.getPrimitiveJavaObject(arguments[0]);
+    int num = intOI.get(arguments[1]);
+
+    PlanFragment fragment = createPlanFragment(query, num);
+    try {
+      InputFormat<NullWritable, Text> format = (InputFormat<NullWritable,Text>)(Class.forName("org.apache.hadoop.hive.llap.LlapInputFormat").newInstance());
+      int index = 0;
+      for (InputSplit s: getSplits(jc, num, fragment.work, fragment.schema)) {
+        RecordReader<NullWritable, Text> reader = format.getRecordReader(s,fragment.jc,null);
+        Text value = reader.createValue();
+        NullWritable key = reader.createKey();
+        index++;
+        while(reader.next(key,value)) {
+          Object[] os = new Object[2];
+          os[0] = index;
+          os[1] = value.toString();
+          forward(os);
+        }
+      }
+    } catch(Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  @Override
+  public void close() throws HiveException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
new file mode 100644
index 0000000..ebb0ca5
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.udf.generic;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapInputSplit;
+import org.apache.hadoop.hive.llap.LlapOutputFormat;
+import org.apache.hadoop.hive.llap.SubmitWorkInfo;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryPlan;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
+import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
+import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
+import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.udf.UDFType;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.TaskSpecBuilder;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * GenericUDTFGetSplits.
+ *
+ */
+@Description(name = "get_splits", value = "_FUNC_(string,int) - "
+    + "Returns an array of length int serialized splits for the referenced tables string.")
+@UDFType(deterministic = false)
+public class GenericUDTFGetSplits extends GenericUDTF {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GenericUDTFGetSplits.class);
+
+  private static final String LLAP_INTERNAL_INPUT_FORMAT_NAME = "org.apache.hadoop.hive.llap.LlapInputFormat";
+
+  protected transient StringObjectInspector stringOI;
+  protected transient IntObjectInspector intOI;
+  protected transient JobConf jc;
+  private ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+  private DataOutput dos = new DataOutputStream(bos);
+
+  @Override
+  public StructObjectInspector initialize(ObjectInspector[] arguments)
+    throws UDFArgumentException {
+
+    LOG.debug("initializing GenericUDFGetSplits");
+
+    if (SessionState.get() == null || SessionState.get().getConf() == null) {
+      throw new IllegalStateException("Cannot run get splits outside HS2");
+    }
+
+    LOG.debug("Initialized conf, jc and metastore connection");
+
+    if (arguments.length != 2) {
+      throw new UDFArgumentLengthException("The function GET_SPLITS accepts 2 arguments.");
+    } else if (!(arguments[0] instanceof StringObjectInspector)) {
+      LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
+      throw new UDFArgumentTypeException(0, "\""
+          + "string\" is expected at function GET_SPLITS, " + "but \""
+          + arguments[0].getTypeName() + "\" is found");
+    } else if (!(arguments[1] instanceof IntObjectInspector)) {
+      LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
+      throw new UDFArgumentTypeException(1, "\""
+          + "int\" is expected at function GET_SPLITS, " + "but \""
+          + arguments[1].getTypeName() + "\" is found");
+    }
+
+    stringOI = (StringObjectInspector) arguments[0];
+    intOI = (IntObjectInspector) arguments[1];
+
+    List<String> names = Arrays.asList("if_class","split_class","split");
+    List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
+      PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+      PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+      PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
+    StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
+
+    LOG.debug("done initializing GenericUDFGetSplits");
+    return outputOI;
+  }
+
+  public static class PlanFragment {
+    public JobConf jc;
+    public TezWork work;
+    public Schema schema;
+
+    public PlanFragment(TezWork work, Schema schema, JobConf jc) {
+      this.work = work;
+      this.schema = schema;
+      this.jc = jc;
+    }
+  }
+
+  @Override
+  public void process(Object[] arguments) throws HiveException {
+
+    String query = stringOI.getPrimitiveJavaObject(arguments[0]);
+    int num = intOI.get(arguments[1]);
+
+    PlanFragment fragment = createPlanFragment(query, num);
+    TezWork tezWork = fragment.work;
+    Schema schema = fragment.schema;
+
+    try {
+      for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
+        Object[] os = new Object[3];
+        os[0] = LLAP_INTERNAL_INPUT_FORMAT_NAME;
+        os[1] = s.getClass().getName();
+        bos.reset();
+        s.write(dos);
+        byte[] frozen = bos.toByteArray();
+        os[2] = frozen;
+        forward(os);
+      }
+    } catch(Exception e) {
+      throw new HiveException(e);
+    }
+  }
+
+  public PlanFragment createPlanFragment(String query, int num) throws HiveException {
+
+    HiveConf conf = new HiveConf(SessionState.get().getConf());
+    HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none");
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, "Llap");
+
+    String originalMode = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE);
+    HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, "llap");
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS, true);
+    HiveConf.setBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS, true);
+
+    try {
+      jc = DagUtils.getInstance().createConfiguration(conf);
+    } catch (IOException e) {
+      throw new HiveException(e);
+    }
+
+    Driver driver = new Driver(conf);
+    CommandProcessorResponse cpr;
+
+    LOG.info("setting fetch.task.conversion to none and query file format to \""
+        + LlapOutputFormat.class.getName()+"\"");
+
+    cpr = driver.compileAndRespond(query);
+    if(cpr.getResponseCode() != 0) {
+      throw new HiveException("Failed to compile query: "+cpr.getException());
+    }
+
+    QueryPlan plan = driver.getPlan();
+    List<Task<?>> roots = plan.getRootTasks();
+    Schema schema = plan.getResultSchema();
+
+    if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
+      throw new HiveException("Was expecting a single TezTask.");
+    }
+
+    TezWork tezWork = ((TezTask)roots.get(0)).getWork();
+
+    if (tezWork.getAllWork().size() != 1) {
+
+      String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", "");
+
+      String ctas = "create temporary table "+tableName+" as "+query;
+      LOG.info("CTAS: "+ctas);
+
+      try {
+        HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, originalMode);
+        cpr = driver.run(ctas, false);
+      } catch(CommandNeedRetryException e) {
+        throw new HiveException(e);
+      }
+
+      if(cpr.getResponseCode() != 0) {
+        throw new HiveException("Failed to create temp table: " + cpr.getException());
+      }
+
+      HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, "llap");
+      query = "select * from " + tableName;
+      cpr = driver.compileAndRespond(query);
+      if(cpr.getResponseCode() != 0) {
+        throw new HiveException("Failed to create temp table: "+cpr.getException());
+      }
+
+      plan = driver.getPlan();
+      roots = plan.getRootTasks();
+      schema = plan.getResultSchema();
+
+      if (roots == null || roots.size() != 1 || !(roots.get(0) instanceof TezTask)) {
+        throw new HiveException("Was expecting a single TezTask.");
+      }
+
+      tezWork = ((TezTask)roots.get(0)).getWork();
+    }
+
+    return new PlanFragment(tezWork, schema, jc);
+  }
+
+  public InputSplit[] getSplits(JobConf job, int numSplits, TezWork work, Schema schema)
+    throws IOException {
+
+    DAG dag = DAG.create(work.getName());
+    dag.setCredentials(job.getCredentials());
+    // TODO: set access control? TezTask.setAccessControlsForCurrentUser(dag);
+
+    DagUtils utils = DagUtils.getInstance();
+    Context ctx = new Context(job);
+    MapWork mapWork = (MapWork) work.getAllWork().get(0);
+    // bunch of things get setup in the context based on conf but we need only the MR tmp directory
+    // for the following method.
+    JobConf wxConf = utils.initializeVertexConf(job, ctx, mapWork);
+    Path scratchDir = utils.createTezDir(ctx.getMRScratchDir(), job);
+    FileSystem fs = scratchDir.getFileSystem(job);
+    try {
+      LocalResource appJarLr = createJarLocalResource(utils.getExecJarPathLocal(), utils, job);
+      Vertex wx = utils.createVertex(wxConf, mapWork, scratchDir, appJarLr,
+          new ArrayList<LocalResource>(), fs, ctx, false, work,
+          work.getVertexType(mapWork));
+      String vertexName = wx.getName();
+      dag.addVertex(wx);
+      utils.addCredentials(mapWork, dag);
+
+
+      // we have the dag now proceed to get the splits:
+      HiveSplitGenerator splitGenerator = new HiveSplitGenerator(null);
+      Preconditions.checkState(HiveConf.getBoolVar(wxConf,
+              HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
+      Preconditions.checkState(HiveConf.getBoolVar(wxConf,
+              HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
+      splitGenerator.initializeSplitGenerator(wxConf, mapWork);
+      List<Event> eventList = splitGenerator.initialize();
+
+      // hack - just serializing with kryo for now. This needs to be done properly
+      InputSplit[] result = new InputSplit[eventList.size() - 1];
+      DataOutputBuffer dob = new DataOutputBuffer();
+
+      InputConfigureVertexTasksEvent configureEvent
+        = (InputConfigureVertexTasksEvent) eventList.get(0);
+
+      List<TaskLocationHint> hints = configureEvent.getLocationHint().getTaskLocationHints();
+
+      Preconditions.checkState(hints.size() == eventList.size() - 1);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("NumEvents=" + eventList.size());
+        LOG.debug("NumSplits=" + result.length);
+      }
+
+      ApplicationId fakeApplicationId
+        = ApplicationId.newInstance(Math.abs(new Random().nextInt()), 0);
+
+      LOG.info("Number of splits: " + (eventList.size() - 1));
+      for (int i = 0; i < eventList.size() - 1; i++) {
+
+        TaskSpec taskSpec =
+          new TaskSpecBuilder().constructTaskSpec(dag, vertexName,
+              eventList.size() - 1, fakeApplicationId, i);
+
+        SubmitWorkInfo submitWorkInfo =
+          new SubmitWorkInfo(taskSpec, fakeApplicationId, System.currentTimeMillis());
+        EventMetaData sourceMetaData =
+          new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, vertexName,
+              "NULL_VERTEX", null);
+        EventMetaData destinationMetaInfo = new TaskSpecBuilder().getDestingationMetaData(wx);
+
+        // Creating the TezEvent here itself, since it's easy to serialize.
+        Event event = eventList.get(i + 1);
+        TaskLocationHint hint = hints.get(i);
+        Set<String> hosts = hint.getHosts();
+        if (hosts.size() != 1) {
+          LOG.warn("Bad # of locations: " + hosts.size());
+        }
+        SplitLocationInfo[] locations = new SplitLocationInfo[hosts.size()];
+
+        int j = 0;
+        for (String host : hosts) {
+          locations[j++] = new SplitLocationInfo(host, false);
+        }
+        TezEvent tezEvent = new TezEvent(event, sourceMetaData, System.currentTimeMillis());
+        tezEvent.setDestinationInfo(destinationMetaInfo);
+
+        bos.reset();
+        dob.reset();
+        tezEvent.write(dob);
+
+        byte[] submitWorkBytes = SubmitWorkInfo.toBytes(submitWorkInfo);
+
+        result[i] = new LlapInputSplit(i, submitWorkBytes, dob.getData(), locations, schema);
+      }
+      return result;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Returns a local resource representing a jar. This resource will be used to execute the plan on
+   * the cluster.
+   *
+   * @param localJarPath
+   *          Local path to the jar to be localized.
+   * @return LocalResource corresponding to the localized hive exec resource.
+   * @throws IOException
+   *           when any file system related call fails.
+   * @throws LoginException
+   *           when we are unable to determine the user.
+   * @throws URISyntaxException
+   *           when current jar location cannot be determined.
+   */
+  private LocalResource createJarLocalResource(String localJarPath, DagUtils utils,
+      Configuration conf)
+    throws IOException, LoginException, IllegalArgumentException, FileNotFoundException {
+    FileStatus destDirStatus = utils.getHiveJarDirectory(conf);
+    assert destDirStatus != null;
+    Path destDirPath = destDirStatus.getPath();
+
+    Path localFile = new Path(localJarPath);
+    String sha = getSha(localFile, conf);
+
+    String destFileName = localFile.getName();
+
+    // Now, try to find the file based on SHA and name. Currently we require exact name match.
+    // We could also allow cutting off versions and other stuff provided that SHA matches...
+    destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
+      + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName);
+
+    // TODO: if this method is ever called on more than one jar, getting the dir and the
+    // list need to be refactored out to be done only once.
+    Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
+    return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf);
+  }
+
+  private String getSha(Path localFile, Configuration conf)
+    throws IOException, IllegalArgumentException {
+    InputStream is = null;
+    try {
+      FileSystem localFs = FileSystem.getLocal(conf);
+      is = localFs.open(localFile);
+      return DigestUtils.sha256Hex(is);
+    } finally {
+      if (is != null) {
+        is.close();
+      }
+    }
+  }
+
+  @Override
+  public void close() throws HiveException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
index 5cabb6a..5db8c48 100644
--- a/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
+++ b/ql/src/java/org/apache/tez/dag/api/TaskSpecBuilder.java
@@ -17,7 +17,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
 // Proxy class within the tez.api package to access package private methods.
 public class TaskSpecBuilder {
 
-  public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits, ApplicationId appId) {
+  public TaskSpec constructTaskSpec(DAG dag, String vertexName, int numSplits, ApplicationId appId, int index) {
     Vertex vertex = dag.getVertex(vertexName);
     ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor();
     List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs =
@@ -43,7 +43,7 @@ public class TaskSpecBuilder {
 
     TezDAGID dagId = TezDAGID.getInstance(appId, 0);
     TezVertexID vertexId = TezVertexID.getInstance(dagId, 0);
-    TezTaskID taskId = TezTaskID.getInstance(vertexId, 0);
+    TezTaskID taskId = TezTaskID.getInstance(vertexId, index);
     TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 0);
     return new TaskSpec(taskAttemptId, dag.getName(), vertexName, numSplits, processorDescriptor, inputSpecs, outputSpecs, null);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index c49231c..7b516fe 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -79,46 +79,52 @@ public class TestLlapOutputFormat {
   @Test
   public void testValues() throws Exception {
     JobConf job = new JobConf();
-    job.set(LlapOutputFormat.LLAP_OF_ID_KEY, "foobar");
-    LlapOutputFormat format = new LlapOutputFormat();
 
-    HiveConf conf = new HiveConf();
-    Socket socket = new Socket("localhost",
-        conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
+    for (int k = 0; k < 5; ++k) {
+      String id = "foobar"+k;
+      job.set(LlapOutputFormat.LLAP_OF_ID_KEY, id);
+      LlapOutputFormat format = new LlapOutputFormat();
 
-    LOG.debug("Socket connected");
+      HiveConf conf = new HiveConf();
+      Socket socket = new Socket("localhost",
+          conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
 
-    socket.getOutputStream().write("foobar".getBytes());
-    socket.getOutputStream().write(0);
-    socket.getOutputStream().flush();
+      LOG.debug("Socket connected");
 
-    Thread.sleep(3000);
+      socket.getOutputStream().write(id.getBytes());
+      socket.getOutputStream().write(0);
+      socket.getOutputStream().flush();
 
-    LOG.debug("Data written");
+      Thread.sleep(3000);
 
-    RecordWriter<NullWritable, Text> writer = format.getRecordWriter(null, job, null, null);
-    Text text = new Text();
+      LOG.debug("Data written");
 
-    LOG.debug("Have record writer");
+      RecordWriter<NullWritable, Text> writer = format.getRecordWriter(null, job, null, null);
+      Text text = new Text();
 
-    for (int i = 0; i < 10; ++i) {
-      text.set(""+i);
-      writer.write(NullWritable.get(),text);
-    }
+      LOG.debug("Have record writer");
 
-    writer.close(null);
+      for (int i = 0; i < 10; ++i) {
+        text.set(""+i);
+        writer.write(NullWritable.get(),text);
+      }
 
-    InputStream in = socket.getInputStream();
-    RecordReader reader = new LlapRecordReader(in, null, Text.class);
+      writer.close(null);
 
-    LOG.debug("Have record reader");
+      InputStream in = socket.getInputStream();
+      RecordReader reader = new LlapRecordReader(in, null, Text.class);
 
-    int count = 0;
-    while(reader.next(NullWritable.get(), text)) {
-      LOG.debug(text.toString());
-      count++;
-    }
+      LOG.debug("Have record reader");
+
+      int count = 0;
+      while(reader.next(NullWritable.get(), text)) {
+        LOG.debug(text.toString());
+        count++;
+      }
 
-    Assert.assertEquals(count,10);
+      reader.close();
+
+      Assert.assertEquals(count,10);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/test/queries/clientpositive/udf_get_splits.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/udf_get_splits.q b/ql/src/test/queries/clientpositive/udf_get_splits.q
deleted file mode 100644
index 70400e8..0000000
--- a/ql/src/test/queries/clientpositive/udf_get_splits.q
+++ /dev/null
@@ -1,6 +0,0 @@
-set hive.fetch.task.conversion=more;
-
-DESCRIBE FUNCTION get_splits;
-DESCRIBE FUNCTION EXTENDED get_splits;
-
-select r.if_class as ic, r.split_class as sc, hash(r.split) as h, length(r.split) as l from (select explode(get_splits("select key, count(*) from src where key % 2 = 0 group by key",5)) as r) t;

http://git-wip-us.apache.org/repos/asf/hive/blob/57761e34/ql/src/test/queries/clientpositive/udtf_get_splits.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/udtf_get_splits.q b/ql/src/test/queries/clientpositive/udtf_get_splits.q
new file mode 100644
index 0000000..f378dca
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/udtf_get_splits.q
@@ -0,0 +1,43 @@
+set hive.fetch.task.conversion=more;
+set hive.mapred.mode=nonstrict;
+set mapred.max.split.size=100;
+set mapred.min.split.size.per.node=100;
+set mapred.min.split.size.per.rack=100;
+set mapred.max.split.size=100;
+set tez.grouping.max-size=100;
+set tez.grouping.min-size=100;
+
+DESCRIBE FUNCTION get_splits;
+DESCRIBE FUNCTION execute_splits;
+
+select r1, r2, floor(length(r3)/100000)
+from
+  (select
+    get_splits(
+      "select key, count(*) from srcpart where key % 2 = 0 group by key",
+      5) as (r1, r2, r3)) t;
+
+select r1, r2, floor(length(r3)/100000)
+from
+  (select
+    get_splits(
+      "select key from srcpart where key % 2 = 0",
+      5) as (r1, r2, r3)) t;
+
+show tables;
+
+select r1, r2
+from
+  (select
+    execute_splits(
+      "select key from srcpart where key % 2 = 0",
+      1) as (r1, r2)) t;
+
+select r1, r2
+from
+  (select
+    execute_splits(
+      "select key from srcpart where key % 2 = 0",
+      5) as (r1, r2)) t;
+
+select count(*) from (select key from srcpart where key % 2 = 0) t;