You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:32:28 UTC

[41/51] [partial] hive git commit: HIVE-14671 : merge master into hive-14535 (Wei Zheng)

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
index aeb89df..663e4b6 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
@@ -51,7 +51,7 @@ public class PigDelegator extends LauncherDelegator {
                boolean usesHcatalog, String completedUrl, boolean enablelog,
                Boolean enableJobReconnect)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, TooManyRequestsException {
     runAs = user;
     List<String> args = makeArgs(execute,
       srcFile, pigArgs,

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
index 5aed3b3..793881b 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
@@ -170,6 +170,7 @@ public class SecureProxySupport {
         return null;
       }
     });
+    FileSystem.closeAllForUGI(ugi);
     return twrapper.tokens;
   }
   private static void collectTokens(FileSystem fs, TokenWrapper twrapper, Credentials creds, String userName) throws IOException {
@@ -204,6 +205,7 @@ public class SecureProxySupport {
         return null;
       }
     });
+    FileSystem.closeAllForUGI(ugi);
 
   }
 
@@ -220,6 +222,7 @@ public class SecureProxySupport {
         return client.getDelegationToken(c.getUser(), u);
       }
     });
+    FileSystem.closeAllForUGI(ugi);
     return s;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
index 2da0204..43a7d57 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
@@ -27,6 +27,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.servlet.http.HttpServletRequest;
@@ -650,7 +652,7 @@ public class Server {
                       @FormParam("enablelog") boolean enablelog,
                       @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, TooManyRequestsException {
     verifyUser();
     verifyParam(inputs, "input");
     verifyParam(mapper, "mapper");
@@ -704,7 +706,7 @@ public class Server {
                   @FormParam("enablelog") boolean enablelog,
                   @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, TooManyRequestsException {
     verifyUser();
     verifyParam(jar, "jar");
     verifyParam(mainClass, "class");
@@ -754,7 +756,7 @@ public class Server {
                @FormParam("enablelog") boolean enablelog,
                @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, TooManyRequestsException {
     verifyUser();
     if (execute == null && srcFile == null) {
       throw new BadParam("Either execute or file parameter required");
@@ -805,7 +807,7 @@ public class Server {
               @FormParam("enablelog") boolean enablelog,
               @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
-    IOException, InterruptedException {
+    IOException, InterruptedException, TooManyRequestsException {
     verifyUser();
     if (command == null && optionsFile == null)
       throw new BadParam("Must define Sqoop command or a optionsfile contains Sqoop command to run Sqoop job.");
@@ -859,7 +861,7 @@ public class Server {
               @FormParam("enablelog") boolean enablelog,
               @FormParam("enablejobreconnect") Boolean enablejobreconnect)
     throws NotAuthorizedException, BusyException, BadParam, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, TooManyRequestsException {
     verifyUser();
     if (execute == null && srcFile == null) {
       throw new BadParam("Either execute or file parameter required");
@@ -891,7 +893,8 @@ public class Server {
   @Path("jobs/{jobid}")
   @Produces({MediaType.APPLICATION_JSON})
   public QueueStatusBean showJobId(@PathParam("jobid") String jobid)
-    throws NotAuthorizedException, BadParam, IOException, InterruptedException {
+    throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+           BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
 
     verifyUser();
     verifyParam(jobid, ":jobid");
@@ -968,7 +971,8 @@ public class Server {
                                        @QueryParam("showall") boolean showall,
                                        @QueryParam("jobid") String jobid,
                                        @QueryParam("numrecords") String numrecords)
-    throws NotAuthorizedException, BadParam, IOException, InterruptedException {
+    throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+    BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
 
     verifyUser();
 
@@ -980,19 +984,14 @@ public class Server {
       showDetails = true;
     }
 
-    ListDelegator ld = new ListDelegator(appConf);
-    List<String> list = ld.run(getDoAsUser(), showall);
-    List<JobItemBean> detailList = new ArrayList<JobItemBean>();
-    int currRecord = 0;
     int numRecords;
-
     // Parse numrecords to an integer
     try {
       if (numrecords != null) {
         numRecords = Integer.parseInt(numrecords);
-  if (numRecords <= 0) {
-    throw new BadParam("numrecords should be an integer > 0");
-  }
+        if (numRecords <= 0) {
+          throw new BadParam("numrecords should be an integer > 0");
+        }
       }
       else {
         numRecords = -1;
@@ -1002,57 +1001,8 @@ public class Server {
       throw new BadParam("Invalid numrecords format: numrecords should be an integer > 0");
     }
 
-    // Sort the list as requested
-    boolean isAscendingOrder = true;
-    switch (appConf.getListJobsOrder()) {
-    case lexicographicaldesc:
-      Collections.sort(list, Collections.reverseOrder());
-      isAscendingOrder = false;
-      break;
-    case lexicographicalasc:
-    default:
-      Collections.sort(list);
-      break;
-    }
-
-    for (String job : list) {
-      // If numRecords = -1, fetch all records.
-      // Hence skip all the below checks when numRecords = -1.
-      if (numRecords != -1) {
-        // If currRecord >= numRecords, we have already fetched the top #numRecords
-        if (currRecord >= numRecords) {
-          break;
-        }
-        else if (jobid == null || jobid.trim().length() == 0) {
-            currRecord++;
-        }
-        // If the current record needs to be returned based on the
-        // filter conditions specified by the user, increment the counter
-        else if (isAscendingOrder && job.compareTo(jobid) > 0 || !isAscendingOrder && job.compareTo(jobid) < 0) {
-          currRecord++;
-        }
-        // The current record should not be included in the output detailList.
-        else {
-          continue;
-        }
-      }
-      JobItemBean jobItem = new JobItemBean();
-      jobItem.id = job;
-      if (showDetails) {
-        StatusDelegator sd = new StatusDelegator(appConf);
-        try {
-          jobItem.detail = sd.run(getDoAsUser(), job);
-        }
-        catch(Exception ex) {
-          /*if we could not get status for some reason, log it, and send empty status back with
-          * just the ID so that caller knows to even look in the log file*/
-          LOG.info("Failed to get status detail for jobId='" + job + "'", ex);
-          jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs");
-        }
-      }
-      detailList.add(jobItem);
-    }
-    return detailList;
+    ListDelegator ld = new ListDelegator(appConf);
+    return ld.run(getDoAsUser(), showall, jobid, numRecords, showDetails);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
index fde5f60..eb84fb2 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
@@ -50,7 +50,7 @@ public class SqoopDelegator extends LauncherDelegator {
                String callback, String completedUrl, boolean enablelog,
                Boolean enableJobReconnect, String libdir)
   throws NotAuthorizedException, BadParam, BusyException, QueueException,
-  IOException, InterruptedException
+  IOException, InterruptedException, TooManyRequestsException
   {
     if(TempletonUtils.isset(appConf.sqoopArchive())) {
       if(!TempletonUtils.isset(appConf.sqoopPath()) && !TempletonUtils.isset(appConf.sqoopHome())) {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
index fac0170..c042ae8 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
@@ -19,10 +19,13 @@
 package org.apache.hive.hcatalog.templeton;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.mapred.JobID;
@@ -41,18 +44,78 @@ import org.apache.hive.hcatalog.templeton.tool.JobState;
  */
 public class StatusDelegator extends TempletonDelegator {
   private static final Logger LOG = LoggerFactory.getLogger(StatusDelegator.class);
+  private final String JOB_STATUS_EXECUTE_THREAD_PREFIX = "JobStatusExecute";
+
+  /**
+   * Current thread id used to set in execution threads.
+   */
+  private final String statusThreadId = Thread.currentThread().getName();
+
+  /*
+   * Job status request executor to get status of a job.
+   */
+  private static JobRequestExecutor<QueueStatusBean> jobRequest =
+                   new JobRequestExecutor<QueueStatusBean>(JobRequestExecutor.JobRequestType.Status,
+                   AppConfig.JOB_STATUS_MAX_THREADS, AppConfig.JOB_STATUS_TIMEOUT);
 
   public StatusDelegator(AppConfig appConf) {
     super(appConf);
   }
 
-  public QueueStatusBean run(String user, String id)
+  /*
+   * Gets status of job form job id. If maximum concurrent job status requests are configured
+   * then status request will be executed on a thread from thread pool. If job status request
+   * time out is configured then request execution thread will be interrupted if thread
+   * times out and does no action.
+   */
+  public QueueStatusBean run(final String user, final String id, boolean enableThreadPool)
+    throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+           BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
+    if (jobRequest.isThreadPoolEnabled() && enableThreadPool) {
+      return jobRequest.execute(getJobStatusCallableTask(user, id));
+    } else {
+      return getJobStatus(user, id);
+    }
+  }
+
+  /*
+   * Job callable task for job status operation. Overrides behavior of execute() to get
+   * status of a job. No need to override behavior of cleanup() as there is nothing to be
+   * done if job sttaus operation is timed out or interrupted.
+   */
+  private JobCallable<QueueStatusBean> getJobStatusCallableTask(final String user,
+                                 final String id) {
+    return new JobCallable<QueueStatusBean>() {
+      @Override
+      public QueueStatusBean execute() throws NotAuthorizedException, BadParam, IOException,
+                                    InterruptedException, BusyException {
+       /*
+        * Change the current thread name to include parent thread Id if it is executed
+        * in thread pool. Useful to extract logs specific to a job request and helpful
+        * to debug job issues.
+        */
+        Thread.currentThread().setName(String.format("%s-%s-%s", JOB_STATUS_EXECUTE_THREAD_PREFIX,
+                                       statusThreadId, Thread.currentThread().getId()));
+
+        return getJobStatus(user, id);
+      }
+    };
+  }
+
+  public QueueStatusBean run(final String user, final String id)
+    throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+           BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
+    return run(user, id, true);
+  }
+
+  public QueueStatusBean getJobStatus(String user, String id)
     throws NotAuthorizedException, BadParam, IOException, InterruptedException
   {
     WebHCatJTShim tracker = null;
     JobState state = null;
+    UserGroupInformation ugi = null;
     try {
-      UserGroupInformation ugi = UgiFactory.getUgi(user);
+      ugi = UgiFactory.getUgi(user);
       tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
       JobID jobid = StatusDelegator.StringToJobID(id);
       if (jobid == null)
@@ -66,6 +129,8 @@ public class StatusDelegator extends TempletonDelegator {
         tracker.close();
       if (state != null)
         state.close();
+      if (ugi != null)
+        FileSystem.closeAllForUGI(ugi);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
index 839b56a..590e49f 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
@@ -51,7 +51,7 @@ public class StreamingDelegator extends LauncherDelegator {
                Boolean enableJobReconnect,
                JobType jobType)
     throws NotAuthorizedException, BadParam, BusyException, QueueException,
-    ExecuteException, IOException, InterruptedException {
+    ExecuteException, IOException, InterruptedException, TooManyRequestsException {
       List<String> args = makeArgs(inputs, inputreader, output, mapper, reducer, combiner,
       fileList, cmdenvs, jarArgs);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
new file mode 100644
index 0000000..9d55ad4
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+/**
+ * Raise this exception if web service is busy with existing requests and not able
+ * service new requests.
+ */
+public class TooManyRequestsException extends SimpleWebException {
+  /*
+   * The current version of jetty server doesn't have the status
+   * HttpStatus.TOO_MANY_REQUESTS_429. Hence, passing this as constant.
+   */
+  public static int TOO_MANY_REQUESTS_429 = 429;
+
+  public TooManyRequestsException(String msg) {
+    super(TOO_MANY_REQUESTS_429, msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
index 15ab8b9..f4c4b76 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
@@ -81,9 +81,14 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi
     this.appConf = conf;
   }
 
-  private JobID submittedJobId;
+  private Job job = null;
 
   public String getSubmittedId() {
+    if (job == null ) {
+      return null;
+    }
+
+    JobID submittedJobId = job.getJobID();
     if (submittedJobId == null) {
       return null;
     } else {
@@ -119,7 +124,7 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi
 
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
     conf.set("user.name", user);
-    Job job = new Job(conf);
+    job = new Job(conf);
     job.setJarByClass(LaunchMapper.class);
     job.setJobName(TempletonControllerJob.class.getSimpleName());
     job.setMapperClass(LaunchMapper.class);
@@ -141,7 +146,7 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi
 
     job.submit();
 
-    submittedJobId = job.getJobID();
+    JobID submittedJobId = job.getJobID();
     if(metastoreTokenStrForm != null) {
       //so that it can be cancelled later from CompleteDelegator
       DelegationTokenCache.getStringFormTokenCache().storeDelegationToken(

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
index 07b005b..e0ccc70 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
@@ -362,6 +362,7 @@ public class TempletonUtils {
     if (hadoopFsIsMissing(defaultFs, p))
       throw new FileNotFoundException("File " + fname + " does not exist.");
 
+    FileSystem.closeAllForUGI(ugi);
     return p;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
new file mode 100644
index 0000000..5fcae46
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.Future;
+
+import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
+/*
+ * Base class for mocking job operations with concurrent requests.
+ */
+public class ConcurrentJobRequestsTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(ConcurrentJobRequestsTestBase.class);
+  private boolean started = false;
+  private Object lock = new Object();
+
+  MockAnswerTestHelper<QueueStatusBean> statusJobHelper = new MockAnswerTestHelper<QueueStatusBean>();
+  MockAnswerTestHelper<QueueStatusBean> killJobHelper = new MockAnswerTestHelper<QueueStatusBean>();
+  MockAnswerTestHelper<List<JobItemBean>> listJobHelper = new MockAnswerTestHelper<List<JobItemBean>>();
+  MockAnswerTestHelper<Integer> submitJobHelper = new MockAnswerTestHelper<Integer>();
+
+  /*
+   * Waits for other threads to join and returns with its Id.
+   */
+  private int waitForAllThreadsToStart(JobRunnable jobRunnable, int poolThreadCount) {
+    int currentId = jobRunnable.threadStartCount.incrementAndGet();
+    LOG.info("Waiting for other threads with thread id: " + currentId);
+    synchronized(lock) {
+      /*
+       * We need a total of poolThreadCount + 1 threads to start at same. There are
+       * poolThreadCount threads in thread pool and another one which has started them.
+       * The thread which sees atomic counter as poolThreadCount+1 is the last thread`
+       * to join and wake up all threads to start all at once.
+       */
+      if (currentId > poolThreadCount) {
+        LOG.info("Waking up all threads: " + currentId);
+        started = true;
+        this.lock.notifyAll();
+      } else {
+        while (!started) {
+          try {
+            this.lock.wait();
+          } catch (InterruptedException ignore) {
+          }
+        }
+      }
+    }
+
+    return currentId;
+  }
+
+  public JobRunnable ConcurrentJobsStatus(final int threadCount, AppConfig appConfig,
+         final boolean killThreads, boolean interruptThreads, final Answer<QueueStatusBean> answer)
+         throws IOException, InterruptedException, QueueException, NotAuthorizedException,
+         BadParam, BusyException {
+
+    StatusDelegator delegator = new StatusDelegator(appConfig);
+    final StatusDelegator mockDelegator = Mockito.spy(delegator);
+
+    Mockito.doAnswer(answer).when(mockDelegator).getJobStatus(Mockito.any(String.class),
+                             Mockito.any(String.class));
+
+    JobRunnable statusJobRunnable = new JobRunnable() {
+      @Override
+      public void run() {
+        try {
+          int threadId = waitForAllThreadsToStart(this, threadCount);
+          LOG.info("Started executing Job Status operation. ThreadId : " + threadId);
+          mockDelegator.run("admin", "job_1000" + threadId);
+        } catch (Exception ex) {
+          exception = ex;
+        }
+      }
+    };
+
+    executeJobOperations(statusJobRunnable, threadCount, killThreads, interruptThreads);
+    return statusJobRunnable;
+  }
+
+  public JobRunnable ConcurrentListJobs(final int threadCount, AppConfig config,
+         final boolean killThreads, boolean interruptThreads, final Answer<List<JobItemBean>> answer)
+         throws IOException, InterruptedException, QueueException, NotAuthorizedException,
+         BadParam, BusyException {
+
+    ListDelegator delegator = new ListDelegator(config);
+    final ListDelegator mockDelegator = Mockito.spy(delegator);
+
+    Mockito.doAnswer(answer).when(mockDelegator).listJobs(Mockito.any(String.class),
+                             Mockito.any(boolean.class), Mockito.any(String.class),
+                             Mockito.any(int.class), Mockito.any(boolean.class));
+
+    JobRunnable listJobRunnable = new JobRunnable() {
+      @Override
+      public void run() {
+        try {
+          int threadId = waitForAllThreadsToStart(this, threadCount);
+          LOG.info("Started executing Job List operation. ThreadId : " + threadId);
+          mockDelegator.run("admin", true, "", 10, true);
+        } catch (Exception ex) {
+          exception = ex;
+        }
+      }
+    };
+
+    executeJobOperations(listJobRunnable, threadCount, killThreads, interruptThreads);
+    return listJobRunnable;
+  }
+
+  public JobRunnable SubmitConcurrentJobs(final int threadCount, AppConfig config,
+         final boolean killThreads, boolean interruptThreads, final Answer<Integer> responseAnswer,
+         final Answer<QueueStatusBean> timeoutResponseAnswer, final String jobIdResponse)
+         throws IOException, InterruptedException, QueueException, NotAuthorizedException,
+         BusyException, TimeoutException, Exception {
+
+    LauncherDelegator delegator = new LauncherDelegator(config);
+    final LauncherDelegator mockDelegator = Mockito.spy(delegator);
+    final List<String> listArgs = new ArrayList<String>();
+
+    TempletonControllerJob mockCtrl = Mockito.mock(TempletonControllerJob.class);
+
+    Mockito.doReturn(jobIdResponse).when(mockCtrl).getSubmittedId();
+
+    Mockito.doReturn(mockCtrl).when(mockDelegator).getTempletonController();
+
+    Mockito.doAnswer(responseAnswer).when(mockDelegator).runTempletonControllerJob(
+              Mockito.any(TempletonControllerJob.class), Mockito.any(List.class));
+
+    Mockito.doAnswer(timeoutResponseAnswer).when(mockDelegator).killJob(
+              Mockito.any(String.class), Mockito.any(String.class));
+
+    Mockito.doNothing().when(mockDelegator).registerJob(Mockito.any(String.class),
+           Mockito.any(String.class), Mockito.any(String.class), Mockito.any(Map.class));
+
+    JobRunnable submitJobRunnable = new JobRunnable() {
+      @Override
+      public void run() {
+        try {
+          int threadId = waitForAllThreadsToStart(this, threadCount);
+          LOG.info("Started executing Job Submit operation. ThreadId : " + threadId);
+          mockDelegator.enqueueController("admin", null, "", listArgs);
+        } catch (Throwable ex) {
+          exception = ex;
+        }
+      }
+    };
+
+    executeJobOperations(submitJobRunnable, threadCount, killThreads, interruptThreads);
+    return submitJobRunnable;
+  }
+
+  public void executeJobOperations(JobRunnable jobRunnable, int threadCount, boolean killThreads,
+                                   boolean interruptThreads)
+    throws IOException, InterruptedException, QueueException, NotAuthorizedException {
+
+    started = false;
+
+    ExecutorService executorService = new ThreadPoolExecutor(threadCount, threadCount, 0L,
+        TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());;
+
+    ArrayList<Future<?>> futures = new ArrayList<Future<?>>();
+    for (int i = 0; i < threadCount; i++) {
+      futures.add(executorService.submit(jobRunnable));
+    }
+
+    waitForAllThreadsToStart(jobRunnable, threadCount);
+    LOG.info("Started all threads ");
+
+    if (killThreads) {
+      executorService.shutdownNow();
+    } else {
+      if (interruptThreads){
+        for (Future<?> future : futures) {
+          LOG.info("Cancelling the thread");
+          future.cancel(true);
+        }
+      }
+
+      executorService.shutdown();
+    }
+
+    /*
+     * For both graceful or forceful shutdown, wait for tasks to terminate such that
+     * appropriate exceptions are raised and stored in JobRunnable.exception.
+     */
+    if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+      LOG.info("Force Shutting down the pool\n");
+      if (!killThreads) {
+        /*
+         * killThreads option has already done force shutdown. No need to do again.
+         */
+        executorService.shutdownNow();
+      }
+    }
+  }
+
+  public abstract class JobRunnable implements Runnable {
+    public volatile Throwable exception = null;
+    public AtomicInteger threadStartCount = new AtomicInteger(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
new file mode 100644
index 0000000..9f1744e
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/*
+ * Helper class to generate mocked response.
+ */
+public class MockAnswerTestHelper<T> {
+  public Answer<T> getIOExceptionAnswer() {
+    return new Answer<T>() {
+      @Override
+      public T answer(InvocationOnMock invocation) throws Exception {
+        throw new IOException("IOException raised manually.");
+      }
+    };
+  }
+
+  public Answer<T> getOutOfMemoryErrorAnswer() {
+    return new Answer<T>() {
+      @Override
+      public T answer(InvocationOnMock invocation) throws OutOfMemoryError {
+        throw new OutOfMemoryError("OutOfMemoryError raised manually.");
+      }
+    };
+  }
+
+  public Answer<T> getDelayedResonseAnswer(final int delayInSeconds, final T response) {
+    return new Answer<T>() {
+      @Override
+      public T answer(InvocationOnMock invocation) throws InterruptedException {
+        Thread.sleep(1000 * delayInSeconds);
+        return response;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
new file mode 100644
index 0000000..695dcc6
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.util.ArrayList;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * Test submission of concurrent job requests.
+ */
+public class TestConcurrentJobRequests extends ConcurrentJobRequestsTestBase {
+
+  private static AppConfig config;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() {
+    final String[] args = new String[] {};
+    Main main = new Main(args);
+    config = main.getAppConfigInstance();
+  }
+
+  @Test
+  public void ConcurrentJobsStatusSuccess() {
+    try {
+      JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
+                statusJobHelper.getDelayedResonseAnswer(4, new QueueStatusBean("job_1000", "Job not found")));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentListJobsSuccess() {
+    try {
+      JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
+                listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentSubmitJobsSuccess() {
+    try {
+      JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(4, 0),
+                killJobHelper.getDelayedResonseAnswer(4, null), "job_1000");
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
new file mode 100644
index 0000000..6f8da40
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jetty.http.HttpStatus;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * Test submission of concurrent job requests with the controlled number of concurrent
+ * Requests. Verify that we get busy exception and appropriate message.
+ */
+public class TestConcurrentJobRequestsThreads extends ConcurrentJobRequestsTestBase {
+
+  private static AppConfig config;
+  private static QueueStatusBean statusBean;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() {
+    final String[] args = new String[] {};
+    Main main = new Main(args);
+    config = main.getAppConfigInstance();
+    config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5);
+    config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5);
+    config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5);
+    statusBean = new QueueStatusBean("job_1000", "Job not found");
+  }
+
+  @Test
+  public void ConcurrentJobsStatusTooManyRequestsException() {
+    try {
+      JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
+                statusJobHelper.getDelayedResonseAnswer(4, statusBean));
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
+      TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception;
+      assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
+      String expectedMessage = "Unable to service the status job request as templeton service is busy "
+                                 + "with too many status job requests. Please wait for some time before "
+                                 + "retrying the operation. Please refer to the config "
+                                 + "templeton.parallellism.job.status to configure concurrent requests.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Verify that new job requests have no issues.
+       */
+      jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+                statusJobHelper.getDelayedResonseAnswer(4, statusBean));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentListJobsTooManyRequestsException() {
+    try {
+      JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
+                listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
+      TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception;
+      assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
+      String expectedMessage = "Unable to service the list job request as templeton service is busy "
+                               + "with too many list job requests. Please wait for some time before "
+                               + "retrying the operation. Please refer to the config "
+                               + "templeton.parallellism.job.list to configure concurrent requests.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Verify that new job requests have no issues.
+       */
+      jobRunnable = ConcurrentListJobs(5, config, false, false,
+                listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentSubmitJobsTooManyRequestsException() {
+    try {
+      JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(4, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
+      TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception;
+      assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
+      String expectedMessage = "Unable to service the submit job request as templeton service is busy "
+                                + "with too many submit job requests. Please wait for some time before "
+                                + "retrying the operation. Please refer to the config "
+                                + "templeton.parallellism.job.submit to configure concurrent requests.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Verify that new job requests have no issues.
+       */
+      jobRunnable = SubmitConcurrentJobs(5, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(4, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java
new file mode 100644
index 0000000..ef49cbd
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jetty.http.HttpStatus;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * Test submission of concurrent job requests with the controlled number of concurrent
+ * Requests and job request execution time outs. Verify that we get appropriate exceptions
+ * and exception message.
+ */
+public class TestConcurrentJobRequestsThreadsAndTimeout extends ConcurrentJobRequestsTestBase {
+
+  private static AppConfig config;
+  private static QueueStatusBean statusBean;
+  private static String statusTooManyRequestsExceptionMessage;
+  private static String listTooManyRequestsExceptionMessage;
+  private static String submitTooManyRequestsExceptionMessage;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @BeforeClass
+  public static void setUp() {
+    final String[] args = new String[] {};
+    Main main = new Main(args);
+    config = main.getAppConfigInstance();
+    config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5);
+    config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5);
+    config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5);
+    config.setInt(AppConfig.JOB_SUBMIT_TIMEOUT, 5);
+    config.setInt(AppConfig.JOB_STATUS_TIMEOUT, 5);
+    config.setInt(AppConfig.JOB_LIST_TIMEOUT, 5);
+    config.setInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 4);
+    config.setInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 1);
+    statusBean = new QueueStatusBean("job_1000", "Job not found");
+
+    statusTooManyRequestsExceptionMessage = "Unable to service the status job request as "
+                                 + "templeton service is busy with too many status job requests. "
+                                 + "Please wait for some time before retrying the operation. "
+                                 + "Please refer to the config templeton.parallellism.job.status "
+                                 + "to configure concurrent requests.";
+    listTooManyRequestsExceptionMessage = "Unable to service the list job request as "
+                                 + "templeton service is busy with too many list job requests. "
+                                 + "Please wait for some time before retrying the operation. "
+                                 + "Please refer to the config templeton.parallellism.job.list "
+                                 + "to configure concurrent requests.";
+    submitTooManyRequestsExceptionMessage = "Unable to service the submit job request as "
+                                 + "templeton service is busy with too many submit job requests. "
+                                 + "Please wait for some time before retrying the operation. "
+                                 + "Please refer to the config templeton.parallellism.job.submit "
+                                 + "to configure concurrent requests.";
+  }
+
+  @Test
+  public void ConcurrentJobsStatusTooManyRequestsException() {
+    try {
+      JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
+                statusJobHelper.getDelayedResonseAnswer(4, statusBean));
+      verifyTooManyRequestsException(jobRunnable.exception, this.statusTooManyRequestsExceptionMessage);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentListJobsTooManyRequestsException() {
+    try {
+      JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
+                listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
+      verifyTooManyRequestsException(jobRunnable.exception, this.listTooManyRequestsExceptionMessage);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentSubmitJobsTooManyRequestsException() {
+    try {
+      JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(4, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+      verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentJobsStatusTimeOutException() {
+    try {
+      JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+                statusJobHelper.getDelayedResonseAnswer(6, statusBean));
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof TimeoutException);
+      String expectedMessage = "Status job request got timed out. Please wait for some time before "
+                               + "retrying the operation. Please refer to the config "
+                               + "templeton.job.status.timeout to configure job request time out.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Verify that new job requests should succeed with no issues.
+       */
+      jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+                statusJobHelper.getDelayedResonseAnswer(0, statusBean));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentListJobsTimeOutException() {
+    try {
+      JobRunnable jobRunnable = ConcurrentListJobs(5, config, false, false,
+                listJobHelper.getDelayedResonseAnswer(6, new ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof TimeoutException);
+      String expectedMessage = "List job request got timed out. Please wait for some time before "
+                               + "retrying the operation. Please refer to the config "
+                               + "templeton.job.list.timeout to configure job request time out.";
+
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Verify that new job requests should succeed with no issues.
+       */
+      jobRunnable = ConcurrentListJobs(5, config, false, false,
+                listJobHelper.getDelayedResonseAnswer(1, new ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentSubmitJobsTimeOutException() {
+    try {
+      JobRunnable jobRunnable = SubmitConcurrentJobs(5, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(6, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof QueueException);
+      String expectedMessage = "Submit job request got timed out. Please wait for some time before "
+                               + "retrying the operation. Please refer to the config "
+                               + "templeton.job.submit.timeout to configure job request time out.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * For submit operation, tasks are not cancelled. Verify that new job request
+       * should fail with TooManyRequestsException.
+       */
+      jobRunnable = SubmitConcurrentJobs(1, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(0, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+      verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage);
+
+     /*
+      * Sleep until all threads with clean up tasks are completed.
+      */
+      Thread.sleep(2000);
+
+      /*
+       * Now, tasks would have passed. Verify that new job requests should succeed with no issues.
+       */
+      jobRunnable = SubmitConcurrentJobs(5, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(0, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentStatusJobsVerifyExceptions() {
+    try {
+      /*
+       * Trigger kill threads and verify we get InterruptedException and expected Message.
+       */
+      int timeoutTaskDelay = 4;
+      JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, true, false,
+                statusJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean));
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof InterruptedException);
+      String expectedMessage = "Status job request got interrupted. Please wait for some time before "
+                               + "retrying the operation.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Interrupt all thread and verify we get InterruptedException and expected Message.
+       */
+      jobRunnable = ConcurrentJobsStatus(5, config, false, true,
+                statusJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean));
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof InterruptedException);
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Raise custom exception like IOException and verify expected Message.
+       */
+      jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+                                     statusJobHelper.getIOExceptionAnswer());
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception.getCause() instanceof IOException);
+
+      /*
+       * Now new job requests should succeed as status operation has no cancel threads.
+       */
+      jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+                statusJobHelper.getDelayedResonseAnswer(0, statusBean));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentListJobsVerifyExceptions() {
+    try {
+      /*
+       * Trigger kill threads and verify we get InterruptedException and expected Message.
+       */
+      int timeoutTaskDelay = 4;
+      JobRunnable jobRunnable = ConcurrentListJobs(5, config, true, false,
+                listJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, new ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof InterruptedException);
+      String expectedMessage = "List job request got interrupted. Please wait for some time before "
+                               + "retrying the operation.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Interrupt all thread and verify we get InterruptedException and expected Message.
+       */
+      jobRunnable = ConcurrentListJobs(5, config, false, true,
+                listJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, new ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof InterruptedException);
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Raise custom exception like IOException and verify expected Message.
+       */
+      jobRunnable = ConcurrentListJobs(5, config, false, false,
+                listJobHelper.getIOExceptionAnswer());
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception.getCause() instanceof IOException);
+
+      /*
+       * Now new job requests should succeed as list operation has no cancel threads.
+       */
+      jobRunnable = ConcurrentListJobs(5, config, false, false,
+                listJobHelper.getDelayedResonseAnswer(0, new ArrayList<JobItemBean>()));
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  @Test
+  public void ConcurrentSubmitJobsVerifyExceptions() {
+    try {
+      int timeoutTaskDelay = 4;
+
+      /*
+       * Raise custom exception like IOException and verify expected Message.
+       * This should not invoke cancel operation.
+       */
+      JobRunnable jobRunnable = SubmitConcurrentJobs(1, config, false, false,
+                submitJobHelper.getIOExceptionAnswer(),
+                killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1002");
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof QueueException);
+      assertTrue(jobRunnable.exception.getMessage().contains("IOException raised manually."));
+
+      /*
+       * Raise custom exception like IOException and verify expected Message.
+       * This should not invoke cancel operation.
+       */
+      jobRunnable = SubmitConcurrentJobs(1, config, false, false,
+                submitJobHelper.getOutOfMemoryErrorAnswer(),
+                killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1003");
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof QueueException);
+      assertTrue(jobRunnable.exception.getMessage().contains("OutOfMemoryError raised manually."));
+
+      /*
+       * Trigger kill threads and verify that we get InterruptedException and expected
+       * Message. This should raise 3 kill operations and ensure that retries keep the time out
+       * occupied for 4 sec.
+       */
+      jobRunnable = SubmitConcurrentJobs(3, config, true, false,
+                submitJobHelper.getDelayedResonseAnswer(2, 0),
+                killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1000");
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof QueueException);
+      String expectedMessage = "Submit job request got interrupted. Please wait for some time "
+                               + "before retrying the operation.";
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * Interrupt all threads and verify we get InterruptedException and expected
+       * Message. Also raise 2 kill operations and ensure that retries keep the time out
+       * occupied for 4 sec.
+       */
+      jobRunnable = SubmitConcurrentJobs(2, config, false, true,
+                submitJobHelper.getDelayedResonseAnswer(2, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1001");
+      assertTrue(jobRunnable.exception != null);
+      assertTrue(jobRunnable.exception instanceof QueueException);
+      assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+      /*
+       * For submit operation, tasks are not cancelled. Verify that new job request
+       * should fail with TooManyRequestsException.
+       */
+      jobRunnable = SubmitConcurrentJobs(1, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(0, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1002");
+      verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage);
+
+      /*
+       * Sleep until all threads with clean up tasks are completed. 2 seconds completing task
+       * and 1 sec grace period.
+       */
+      Thread.sleep((timeoutTaskDelay + 2 + 1) * 1000);
+
+      /*
+       * Now new job requests should succeed as all cancel threads would have completed.
+       */
+      jobRunnable = SubmitConcurrentJobs(5, config, false, false,
+                submitJobHelper.getDelayedResonseAnswer(0, 0),
+                killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1004");
+      assertTrue(jobRunnable.exception == null);
+    } catch (Exception e) {
+      assertTrue(false);
+    }
+  }
+
+  private void verifyTooManyRequestsException(Throwable exception, String expectedMessage) {
+      assertTrue(exception != null);
+      assertTrue(exception instanceof TooManyRequestsException);
+      TooManyRequestsException ex = (TooManyRequestsException)exception;
+      assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
+      assertTrue(exception.getMessage().contains(expectedMessage));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hplsql/pom.xml
----------------------------------------------------------------------
diff --git a/hplsql/pom.xml b/hplsql/pom.xml
index d1337cb..44da8b2 100644
--- a/hplsql/pom.xml
+++ b/hplsql/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
----------------------------------------------------------------------
diff --git a/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java b/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
index 9c29eeb..4901e89 100644
--- a/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
+++ b/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
@@ -60,15 +60,7 @@ public class Udf extends GenericUDF {
   @Override
   public Object evaluate(DeferredObject[] arguments) throws HiveException {
     if (exec == null) {
-      exec = new Exec(); 
-      String query = queryOI.getPrimitiveJavaObject(arguments[0].get());
-      String[] args = { "-e", query, "-trace" };
-      try {
-        exec.setUdfRun(true);
-        exec.init(args);
-      } catch (Exception e) {
-        throw new HiveException(e.getMessage());
-      }
+      initExec(arguments);
     }
     if (arguments.length > 1) {
       setParameters(arguments);
@@ -79,6 +71,22 @@ public class Udf extends GenericUDF {
     }
     return null;
   }
+
+  /**
+   * init exec
+   */
+  public void initExec(DeferredObject[] arguments) throws HiveException {
+    exec = new Exec();
+    exec.enterGlobalScope();
+    String query = queryOI.getPrimitiveJavaObject(arguments[0].get());
+    String[] args = { "-e", query, "-trace" };
+    try {
+      exec.setUdfRun(true);
+      exec.init(args);
+    } catch (Exception e) {
+      throw new HiveException(e.getMessage());
+    }
+  }
   
   /**
    * Set parameters for the current call

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java
----------------------------------------------------------------------
diff --git a/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java b/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java
new file mode 100644
index 0000000..3896229
--- /dev/null
+++ b/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hplsql;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredJavaObject;
+
+public class TestHplsqlUdf {
+  StringObjectInspector queryOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
+  ObjectInspector argOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
+
+  /**
+   * test evaluate for exec init and setParameters
+   */
+  @Test
+  public void testEvaluateWithoutRun() throws HiveException {
+    // init udf
+    Udf udf = new Udf();
+    ObjectInspector[] initArguments = {queryOI, argOI};
+    udf.initialize(initArguments);
+    //set arguments
+    DeferredObject queryObj = new DeferredJavaObject("hello(:1)");
+      DeferredObject argObj = new DeferredJavaObject("name");
+      DeferredObject[] argumentsObj = {queryObj, argObj};
+      
+      // init exec and set parameters, included
+      udf.initExec(argumentsObj);
+      udf.setParameters(argumentsObj);
+      
+      // checking var exists and its value is right
+      Var var = udf.exec.findVariable(":1");
+      Assert.assertNotNull(var);
+      String val = (String) var.value;
+      Assert.assertEquals(val, "name");
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-serde/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-serde/pom.xml b/itests/custom-serde/pom.xml
index 166ffde..78b68c5 100644
--- a/itests/custom-serde/pom.xml
+++ b/itests/custom-serde/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive-it</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/pom.xml b/itests/custom-udfs/pom.xml
index b230b41..de7df16 100644
--- a/itests/custom-udfs/pom.xml
+++ b/itests/custom-udfs/pom.xml
@@ -19,7 +19,7 @@ limitations under the License.
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive-it</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-classloader-udf1/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-classloader-udf1/pom.xml b/itests/custom-udfs/udf-classloader-udf1/pom.xml
index 0a95c94..f863efd 100644
--- a/itests/custom-udfs/udf-classloader-udf1/pom.xml
+++ b/itests/custom-udfs/udf-classloader-udf1/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive-it-custom-udfs</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-classloader-udf2/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-classloader-udf2/pom.xml b/itests/custom-udfs/udf-classloader-udf2/pom.xml
index e3f30f1..2553f3e 100644
--- a/itests/custom-udfs/udf-classloader-udf2/pom.xml
+++ b/itests/custom-udfs/udf-classloader-udf2/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive-it-custom-udfs</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-classloader-util/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-classloader-util/pom.xml b/itests/custom-udfs/udf-classloader-util/pom.xml
index fe285d7..565a661 100644
--- a/itests/custom-udfs/udf-classloader-util/pom.xml
+++ b/itests/custom-udfs/udf-classloader-util/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive-it-custom-udfs</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-vectorized-badexample/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-vectorized-badexample/pom.xml b/itests/custom-udfs/udf-vectorized-badexample/pom.xml
index 35c1a2f..6dc923d 100644
--- a/itests/custom-udfs/udf-vectorized-badexample/pom.xml
+++ b/itests/custom-udfs/udf-vectorized-badexample/pom.xml
@@ -19,7 +19,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive-it-custom-udfs</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/hcatalog-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml
index 3ef87f9..c157aed 100644
--- a/itests/hcatalog-unit/pom.xml
+++ b/itests/hcatalog-unit/pom.xml
@@ -25,7 +25,7 @@
   <parent>
     <groupId>org.apache.hive</groupId>
     <artifactId>hive-it</artifactId>
-    <version>2.2.0-SNAPSHOT</version>
+    <version>3.0.0-SNAPSHOT</version>
     <relativePath>../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 8468b84..93ff498 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -915,6 +916,12 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
   }
 
   @Override
+  public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+      String tableName) throws MetaException, NoSuchObjectException {
+    return objectStore.getAggrColStatsForTablePartitions(dbName, tableName);
+  }
+
+  @Override
   @CanNotRetry
   public Boolean commitTransactionExpectDeadlock() {
     return null;