You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/05 17:32:28 UTC
[41/51] [partial] hive git commit: HIVE-14671 : merge master into
hive-14535 (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
index aeb89df..663e4b6 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
@@ -51,7 +51,7 @@ public class PigDelegator extends LauncherDelegator {
boolean usesHcatalog, String completedUrl, boolean enablelog,
Boolean enableJobReconnect)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- ExecuteException, IOException, InterruptedException {
+ ExecuteException, IOException, InterruptedException, TooManyRequestsException {
runAs = user;
List<String> args = makeArgs(execute,
srcFile, pigArgs,
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
index 5aed3b3..793881b 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
@@ -170,6 +170,7 @@ public class SecureProxySupport {
return null;
}
});
+ FileSystem.closeAllForUGI(ugi);
return twrapper.tokens;
}
private static void collectTokens(FileSystem fs, TokenWrapper twrapper, Credentials creds, String userName) throws IOException {
@@ -204,6 +205,7 @@ public class SecureProxySupport {
return null;
}
});
+ FileSystem.closeAllForUGI(ugi);
}
@@ -220,6 +222,7 @@ public class SecureProxySupport {
return client.getDelegationToken(c.getUser(), u);
}
});
+ FileSystem.closeAllForUGI(ugi);
return s;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
index 2da0204..43a7d57 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
@@ -27,6 +27,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.servlet.http.HttpServletRequest;
@@ -650,7 +652,7 @@ public class Server {
@FormParam("enablelog") boolean enablelog,
@FormParam("enablejobreconnect") Boolean enablejobreconnect)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- ExecuteException, IOException, InterruptedException {
+ ExecuteException, IOException, InterruptedException, TooManyRequestsException {
verifyUser();
verifyParam(inputs, "input");
verifyParam(mapper, "mapper");
@@ -704,7 +706,7 @@ public class Server {
@FormParam("enablelog") boolean enablelog,
@FormParam("enablejobreconnect") Boolean enablejobreconnect)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- ExecuteException, IOException, InterruptedException {
+ ExecuteException, IOException, InterruptedException, TooManyRequestsException {
verifyUser();
verifyParam(jar, "jar");
verifyParam(mainClass, "class");
@@ -754,7 +756,7 @@ public class Server {
@FormParam("enablelog") boolean enablelog,
@FormParam("enablejobreconnect") Boolean enablejobreconnect)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- ExecuteException, IOException, InterruptedException {
+ ExecuteException, IOException, InterruptedException, TooManyRequestsException {
verifyUser();
if (execute == null && srcFile == null) {
throw new BadParam("Either execute or file parameter required");
@@ -805,7 +807,7 @@ public class Server {
@FormParam("enablelog") boolean enablelog,
@FormParam("enablejobreconnect") Boolean enablejobreconnect)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- IOException, InterruptedException {
+ IOException, InterruptedException, TooManyRequestsException {
verifyUser();
if (command == null && optionsFile == null)
throw new BadParam("Must define Sqoop command or a optionsfile contains Sqoop command to run Sqoop job.");
@@ -859,7 +861,7 @@ public class Server {
@FormParam("enablelog") boolean enablelog,
@FormParam("enablejobreconnect") Boolean enablejobreconnect)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- ExecuteException, IOException, InterruptedException {
+ ExecuteException, IOException, InterruptedException, TooManyRequestsException {
verifyUser();
if (execute == null && srcFile == null) {
throw new BadParam("Either execute or file parameter required");
@@ -891,7 +893,8 @@ public class Server {
@Path("jobs/{jobid}")
@Produces({MediaType.APPLICATION_JSON})
public QueueStatusBean showJobId(@PathParam("jobid") String jobid)
- throws NotAuthorizedException, BadParam, IOException, InterruptedException {
+ throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+ BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
verifyUser();
verifyParam(jobid, ":jobid");
@@ -968,7 +971,8 @@ public class Server {
@QueryParam("showall") boolean showall,
@QueryParam("jobid") String jobid,
@QueryParam("numrecords") String numrecords)
- throws NotAuthorizedException, BadParam, IOException, InterruptedException {
+ throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+ BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
verifyUser();
@@ -980,19 +984,14 @@ public class Server {
showDetails = true;
}
- ListDelegator ld = new ListDelegator(appConf);
- List<String> list = ld.run(getDoAsUser(), showall);
- List<JobItemBean> detailList = new ArrayList<JobItemBean>();
- int currRecord = 0;
int numRecords;
-
// Parse numrecords to an integer
try {
if (numrecords != null) {
numRecords = Integer.parseInt(numrecords);
- if (numRecords <= 0) {
- throw new BadParam("numrecords should be an integer > 0");
- }
+ if (numRecords <= 0) {
+ throw new BadParam("numrecords should be an integer > 0");
+ }
}
else {
numRecords = -1;
@@ -1002,57 +1001,8 @@ public class Server {
throw new BadParam("Invalid numrecords format: numrecords should be an integer > 0");
}
- // Sort the list as requested
- boolean isAscendingOrder = true;
- switch (appConf.getListJobsOrder()) {
- case lexicographicaldesc:
- Collections.sort(list, Collections.reverseOrder());
- isAscendingOrder = false;
- break;
- case lexicographicalasc:
- default:
- Collections.sort(list);
- break;
- }
-
- for (String job : list) {
- // If numRecords = -1, fetch all records.
- // Hence skip all the below checks when numRecords = -1.
- if (numRecords != -1) {
- // If currRecord >= numRecords, we have already fetched the top #numRecords
- if (currRecord >= numRecords) {
- break;
- }
- else if (jobid == null || jobid.trim().length() == 0) {
- currRecord++;
- }
- // If the current record needs to be returned based on the
- // filter conditions specified by the user, increment the counter
- else if (isAscendingOrder && job.compareTo(jobid) > 0 || !isAscendingOrder && job.compareTo(jobid) < 0) {
- currRecord++;
- }
- // The current record should not be included in the output detailList.
- else {
- continue;
- }
- }
- JobItemBean jobItem = new JobItemBean();
- jobItem.id = job;
- if (showDetails) {
- StatusDelegator sd = new StatusDelegator(appConf);
- try {
- jobItem.detail = sd.run(getDoAsUser(), job);
- }
- catch(Exception ex) {
- /*if we could not get status for some reason, log it, and send empty status back with
- * just the ID so that caller knows to even look in the log file*/
- LOG.info("Failed to get status detail for jobId='" + job + "'", ex);
- jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs");
- }
- }
- detailList.add(jobItem);
- }
- return detailList;
+ ListDelegator ld = new ListDelegator(appConf);
+ return ld.run(getDoAsUser(), showall, jobid, numRecords, showDetails);
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
index fde5f60..eb84fb2 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
@@ -50,7 +50,7 @@ public class SqoopDelegator extends LauncherDelegator {
String callback, String completedUrl, boolean enablelog,
Boolean enableJobReconnect, String libdir)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- IOException, InterruptedException
+ IOException, InterruptedException, TooManyRequestsException
{
if(TempletonUtils.isset(appConf.sqoopArchive())) {
if(!TempletonUtils.isset(appConf.sqoopPath()) && !TempletonUtils.isset(appConf.sqoopHome())) {
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
index fac0170..c042ae8 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
@@ -19,10 +19,13 @@
package org.apache.hive.hcatalog.templeton;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.mapred.JobID;
@@ -41,18 +44,78 @@ import org.apache.hive.hcatalog.templeton.tool.JobState;
*/
public class StatusDelegator extends TempletonDelegator {
private static final Logger LOG = LoggerFactory.getLogger(StatusDelegator.class);
+ private final String JOB_STATUS_EXECUTE_THREAD_PREFIX = "JobStatusExecute";
+
+ /**
+ * Current thread id used to set in execution threads.
+ */
+ private final String statusThreadId = Thread.currentThread().getName();
+
+ /*
+ * Job status request executor to get status of a job.
+ */
+ private static JobRequestExecutor<QueueStatusBean> jobRequest =
+ new JobRequestExecutor<QueueStatusBean>(JobRequestExecutor.JobRequestType.Status,
+ AppConfig.JOB_STATUS_MAX_THREADS, AppConfig.JOB_STATUS_TIMEOUT);
public StatusDelegator(AppConfig appConf) {
super(appConf);
}
- public QueueStatusBean run(String user, String id)
+ /*
+ * Gets status of job form job id. If maximum concurrent job status requests are configured
+ * then status request will be executed on a thread from thread pool. If job status request
+ * time out is configured then request execution thread will be interrupted if thread
+ * times out and does no action.
+ */
+ public QueueStatusBean run(final String user, final String id, boolean enableThreadPool)
+ throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+ BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
+ if (jobRequest.isThreadPoolEnabled() && enableThreadPool) {
+ return jobRequest.execute(getJobStatusCallableTask(user, id));
+ } else {
+ return getJobStatus(user, id);
+ }
+ }
+
+ /*
+ * Job callable task for job status operation. Overrides behavior of execute() to get
+ * status of a job. No need to override behavior of cleanup() as there is nothing to be
+ * done if job sttaus operation is timed out or interrupted.
+ */
+ private JobCallable<QueueStatusBean> getJobStatusCallableTask(final String user,
+ final String id) {
+ return new JobCallable<QueueStatusBean>() {
+ @Override
+ public QueueStatusBean execute() throws NotAuthorizedException, BadParam, IOException,
+ InterruptedException, BusyException {
+ /*
+ * Change the current thread name to include parent thread Id if it is executed
+ * in thread pool. Useful to extract logs specific to a job request and helpful
+ * to debug job issues.
+ */
+ Thread.currentThread().setName(String.format("%s-%s-%s", JOB_STATUS_EXECUTE_THREAD_PREFIX,
+ statusThreadId, Thread.currentThread().getId()));
+
+ return getJobStatus(user, id);
+ }
+ };
+ }
+
+ public QueueStatusBean run(final String user, final String id)
+ throws NotAuthorizedException, BadParam, IOException, InterruptedException,
+ BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
+ return run(user, id, true);
+ }
+
+ public QueueStatusBean getJobStatus(String user, String id)
throws NotAuthorizedException, BadParam, IOException, InterruptedException
{
WebHCatJTShim tracker = null;
JobState state = null;
+ UserGroupInformation ugi = null;
try {
- UserGroupInformation ugi = UgiFactory.getUgi(user);
+ ugi = UgiFactory.getUgi(user);
tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
JobID jobid = StatusDelegator.StringToJobID(id);
if (jobid == null)
@@ -66,6 +129,8 @@ public class StatusDelegator extends TempletonDelegator {
tracker.close();
if (state != null)
state.close();
+ if (ugi != null)
+ FileSystem.closeAllForUGI(ugi);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
index 839b56a..590e49f 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
@@ -51,7 +51,7 @@ public class StreamingDelegator extends LauncherDelegator {
Boolean enableJobReconnect,
JobType jobType)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- ExecuteException, IOException, InterruptedException {
+ ExecuteException, IOException, InterruptedException, TooManyRequestsException {
List<String> args = makeArgs(inputs, inputreader, output, mapper, reducer, combiner,
fileList, cmdenvs, jarArgs);
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
new file mode 100644
index 0000000..9d55ad4
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+/**
+ * Raise this exception if web service is busy with existing requests and not able
+ * service new requests.
+ */
+public class TooManyRequestsException extends SimpleWebException {
+ /*
+ * The current version of jetty server doesn't have the status
+ * HttpStatus.TOO_MANY_REQUESTS_429. Hence, passing this as constant.
+ */
+ public static int TOO_MANY_REQUESTS_429 = 429;
+
+ public TooManyRequestsException(String msg) {
+ super(TOO_MANY_REQUESTS_429, msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
index 15ab8b9..f4c4b76 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
@@ -81,9 +81,14 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi
this.appConf = conf;
}
- private JobID submittedJobId;
+ private Job job = null;
public String getSubmittedId() {
+ if (job == null ) {
+ return null;
+ }
+
+ JobID submittedJobId = job.getJobID();
if (submittedJobId == null) {
return null;
} else {
@@ -119,7 +124,7 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi
String user = UserGroupInformation.getCurrentUser().getShortUserName();
conf.set("user.name", user);
- Job job = new Job(conf);
+ job = new Job(conf);
job.setJarByClass(LaunchMapper.class);
job.setJobName(TempletonControllerJob.class.getSimpleName());
job.setMapperClass(LaunchMapper.class);
@@ -141,7 +146,7 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi
job.submit();
- submittedJobId = job.getJobID();
+ JobID submittedJobId = job.getJobID();
if(metastoreTokenStrForm != null) {
//so that it can be cancelled later from CompleteDelegator
DelegationTokenCache.getStringFormTokenCache().storeDelegationToken(
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
index 07b005b..e0ccc70 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
@@ -362,6 +362,7 @@ public class TempletonUtils {
if (hadoopFsIsMissing(defaultFs, p))
throw new FileNotFoundException("File " + fname + " does not exist.");
+ FileSystem.closeAllForUGI(ugi);
return p;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
new file mode 100644
index 0000000..5fcae46
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.Future;
+
+import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+
+/*
+ * Base class for mocking job operations with concurrent requests.
+ */
+public class ConcurrentJobRequestsTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(ConcurrentJobRequestsTestBase.class);
+ private boolean started = false;
+ private Object lock = new Object();
+
+ MockAnswerTestHelper<QueueStatusBean> statusJobHelper = new MockAnswerTestHelper<QueueStatusBean>();
+ MockAnswerTestHelper<QueueStatusBean> killJobHelper = new MockAnswerTestHelper<QueueStatusBean>();
+ MockAnswerTestHelper<List<JobItemBean>> listJobHelper = new MockAnswerTestHelper<List<JobItemBean>>();
+ MockAnswerTestHelper<Integer> submitJobHelper = new MockAnswerTestHelper<Integer>();
+
+ /*
+ * Waits for other threads to join and returns with its Id.
+ */
+ private int waitForAllThreadsToStart(JobRunnable jobRunnable, int poolThreadCount) {
+ int currentId = jobRunnable.threadStartCount.incrementAndGet();
+ LOG.info("Waiting for other threads with thread id: " + currentId);
+ synchronized(lock) {
+ /*
+ * We need a total of poolThreadCount + 1 threads to start at same. There are
+ * poolThreadCount threads in thread pool and another one which has started them.
+ * The thread which sees atomic counter as poolThreadCount+1 is the last thread`
+ * to join and wake up all threads to start all at once.
+ */
+ if (currentId > poolThreadCount) {
+ LOG.info("Waking up all threads: " + currentId);
+ started = true;
+ this.lock.notifyAll();
+ } else {
+ while (!started) {
+ try {
+ this.lock.wait();
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+ }
+
+ return currentId;
+ }
+
+ public JobRunnable ConcurrentJobsStatus(final int threadCount, AppConfig appConfig,
+ final boolean killThreads, boolean interruptThreads, final Answer<QueueStatusBean> answer)
+ throws IOException, InterruptedException, QueueException, NotAuthorizedException,
+ BadParam, BusyException {
+
+ StatusDelegator delegator = new StatusDelegator(appConfig);
+ final StatusDelegator mockDelegator = Mockito.spy(delegator);
+
+ Mockito.doAnswer(answer).when(mockDelegator).getJobStatus(Mockito.any(String.class),
+ Mockito.any(String.class));
+
+ JobRunnable statusJobRunnable = new JobRunnable() {
+ @Override
+ public void run() {
+ try {
+ int threadId = waitForAllThreadsToStart(this, threadCount);
+ LOG.info("Started executing Job Status operation. ThreadId : " + threadId);
+ mockDelegator.run("admin", "job_1000" + threadId);
+ } catch (Exception ex) {
+ exception = ex;
+ }
+ }
+ };
+
+ executeJobOperations(statusJobRunnable, threadCount, killThreads, interruptThreads);
+ return statusJobRunnable;
+ }
+
+ public JobRunnable ConcurrentListJobs(final int threadCount, AppConfig config,
+ final boolean killThreads, boolean interruptThreads, final Answer<List<JobItemBean>> answer)
+ throws IOException, InterruptedException, QueueException, NotAuthorizedException,
+ BadParam, BusyException {
+
+ ListDelegator delegator = new ListDelegator(config);
+ final ListDelegator mockDelegator = Mockito.spy(delegator);
+
+ Mockito.doAnswer(answer).when(mockDelegator).listJobs(Mockito.any(String.class),
+ Mockito.any(boolean.class), Mockito.any(String.class),
+ Mockito.any(int.class), Mockito.any(boolean.class));
+
+ JobRunnable listJobRunnable = new JobRunnable() {
+ @Override
+ public void run() {
+ try {
+ int threadId = waitForAllThreadsToStart(this, threadCount);
+ LOG.info("Started executing Job List operation. ThreadId : " + threadId);
+ mockDelegator.run("admin", true, "", 10, true);
+ } catch (Exception ex) {
+ exception = ex;
+ }
+ }
+ };
+
+ executeJobOperations(listJobRunnable, threadCount, killThreads, interruptThreads);
+ return listJobRunnable;
+ }
+
+ public JobRunnable SubmitConcurrentJobs(final int threadCount, AppConfig config,
+ final boolean killThreads, boolean interruptThreads, final Answer<Integer> responseAnswer,
+ final Answer<QueueStatusBean> timeoutResponseAnswer, final String jobIdResponse)
+ throws IOException, InterruptedException, QueueException, NotAuthorizedException,
+ BusyException, TimeoutException, Exception {
+
+ LauncherDelegator delegator = new LauncherDelegator(config);
+ final LauncherDelegator mockDelegator = Mockito.spy(delegator);
+ final List<String> listArgs = new ArrayList<String>();
+
+ TempletonControllerJob mockCtrl = Mockito.mock(TempletonControllerJob.class);
+
+ Mockito.doReturn(jobIdResponse).when(mockCtrl).getSubmittedId();
+
+ Mockito.doReturn(mockCtrl).when(mockDelegator).getTempletonController();
+
+ Mockito.doAnswer(responseAnswer).when(mockDelegator).runTempletonControllerJob(
+ Mockito.any(TempletonControllerJob.class), Mockito.any(List.class));
+
+ Mockito.doAnswer(timeoutResponseAnswer).when(mockDelegator).killJob(
+ Mockito.any(String.class), Mockito.any(String.class));
+
+ Mockito.doNothing().when(mockDelegator).registerJob(Mockito.any(String.class),
+ Mockito.any(String.class), Mockito.any(String.class), Mockito.any(Map.class));
+
+ JobRunnable submitJobRunnable = new JobRunnable() {
+ @Override
+ public void run() {
+ try {
+ int threadId = waitForAllThreadsToStart(this, threadCount);
+ LOG.info("Started executing Job Submit operation. ThreadId : " + threadId);
+ mockDelegator.enqueueController("admin", null, "", listArgs);
+ } catch (Throwable ex) {
+ exception = ex;
+ }
+ }
+ };
+
+ executeJobOperations(submitJobRunnable, threadCount, killThreads, interruptThreads);
+ return submitJobRunnable;
+ }
+
+ public void executeJobOperations(JobRunnable jobRunnable, int threadCount, boolean killThreads,
+ boolean interruptThreads)
+ throws IOException, InterruptedException, QueueException, NotAuthorizedException {
+
+ started = false;
+
+ ExecutorService executorService = new ThreadPoolExecutor(threadCount, threadCount, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());;
+
+ ArrayList<Future<?>> futures = new ArrayList<Future<?>>();
+ for (int i = 0; i < threadCount; i++) {
+ futures.add(executorService.submit(jobRunnable));
+ }
+
+ waitForAllThreadsToStart(jobRunnable, threadCount);
+ LOG.info("Started all threads ");
+
+ if (killThreads) {
+ executorService.shutdownNow();
+ } else {
+ if (interruptThreads){
+ for (Future<?> future : futures) {
+ LOG.info("Cancelling the thread");
+ future.cancel(true);
+ }
+ }
+
+ executorService.shutdown();
+ }
+
+ /*
+ * For both graceful or forceful shutdown, wait for tasks to terminate such that
+ * appropriate exceptions are raised and stored in JobRunnable.exception.
+ */
+ if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ LOG.info("Force Shutting down the pool\n");
+ if (!killThreads) {
+ /*
+ * killThreads option has already done force shutdown. No need to do again.
+ */
+ executorService.shutdownNow();
+ }
+ }
+ }
+
+ public abstract class JobRunnable implements Runnable {
+ public volatile Throwable exception = null;
+ public AtomicInteger threadStartCount = new AtomicInteger(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
new file mode 100644
index 0000000..9f1744e
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/*
+ * Helper class to generate mocked response.
+ */
+public class MockAnswerTestHelper<T> {
+ public Answer<T> getIOExceptionAnswer() {
+ return new Answer<T>() {
+ @Override
+ public T answer(InvocationOnMock invocation) throws Exception {
+ throw new IOException("IOException raised manually.");
+ }
+ };
+ }
+
+ public Answer<T> getOutOfMemoryErrorAnswer() {
+ return new Answer<T>() {
+ @Override
+ public T answer(InvocationOnMock invocation) throws OutOfMemoryError {
+ throw new OutOfMemoryError("OutOfMemoryError raised manually.");
+ }
+ };
+ }
+
+ public Answer<T> getDelayedResonseAnswer(final int delayInSeconds, final T response) {
+ return new Answer<T>() {
+ @Override
+ public T answer(InvocationOnMock invocation) throws InterruptedException {
+ Thread.sleep(1000 * delayInSeconds);
+ return response;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
new file mode 100644
index 0000000..695dcc6
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.util.ArrayList;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * Test submission of concurrent job requests.
+ */
+public class TestConcurrentJobRequests extends ConcurrentJobRequestsTestBase {
+
+ private static AppConfig config;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @BeforeClass
+ public static void setUp() {
+ final String[] args = new String[] {};
+ Main main = new Main(args);
+ config = main.getAppConfigInstance();
+ }
+
+ @Test
+ public void ConcurrentJobsStatusSuccess() {
+ try {
+ JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
+ statusJobHelper.getDelayedResonseAnswer(4, new QueueStatusBean("job_1000", "Job not found")));
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentListJobsSuccess() {
+ try {
+ JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
+ listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentSubmitJobsSuccess() {
+ try {
+ JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
+ submitJobHelper.getDelayedResonseAnswer(4, 0),
+ killJobHelper.getDelayedResonseAnswer(4, null), "job_1000");
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
new file mode 100644
index 0000000..6f8da40
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jetty.http.HttpStatus;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * Test submission of concurrent job requests with the controlled number of concurrent
+ * Requests. Verify that we get busy exception and appropriate message.
+ */
+public class TestConcurrentJobRequestsThreads extends ConcurrentJobRequestsTestBase {
+
+ private static AppConfig config;
+ private static QueueStatusBean statusBean;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @BeforeClass
+ public static void setUp() {
+ final String[] args = new String[] {};
+ Main main = new Main(args);
+ config = main.getAppConfigInstance();
+ config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5);
+ config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5);
+ config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5);
+ statusBean = new QueueStatusBean("job_1000", "Job not found");
+ }
+
+ @Test
+ public void ConcurrentJobsStatusTooManyRequestsException() {
+ try {
+ JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
+ statusJobHelper.getDelayedResonseAnswer(4, statusBean));
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
+ TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception;
+ assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
+ String expectedMessage = "Unable to service the status job request as templeton service is busy "
+ + "with too many status job requests. Please wait for some time before "
+ + "retrying the operation. Please refer to the config "
+ + "templeton.parallellism.job.status to configure concurrent requests.";
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * Verify that new job requests have no issues.
+ */
+ jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+ statusJobHelper.getDelayedResonseAnswer(4, statusBean));
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentListJobsTooManyRequestsException() {
+ try {
+ JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
+ listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
+ TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception;
+ assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
+ String expectedMessage = "Unable to service the list job request as templeton service is busy "
+ + "with too many list job requests. Please wait for some time before "
+ + "retrying the operation. Please refer to the config "
+ + "templeton.parallellism.job.list to configure concurrent requests.";
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * Verify that new job requests have no issues.
+ */
+ jobRunnable = ConcurrentListJobs(5, config, false, false,
+ listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentSubmitJobsTooManyRequestsException() {
+ try {
+ JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
+ submitJobHelper.getDelayedResonseAnswer(4, 0),
+ killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
+ TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception;
+ assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
+ String expectedMessage = "Unable to service the submit job request as templeton service is busy "
+ + "with too many submit job requests. Please wait for some time before "
+ + "retrying the operation. Please refer to the config "
+ + "templeton.parallellism.job.submit to configure concurrent requests.";
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * Verify that new job requests have no issues.
+ */
+ jobRunnable = SubmitConcurrentJobs(5, config, false, false,
+ submitJobHelper.getDelayedResonseAnswer(4, 0),
+ killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java
new file mode 100644
index 0000000..ef49cbd
--- /dev/null
+++ b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.templeton;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeoutException;
+import org.eclipse.jetty.http.HttpStatus;
+
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import static org.junit.Assert.assertTrue;
+
+/*
+ * Test submission of concurrent job requests with the controlled number of concurrent
+ * Requests and job request execution time outs. Verify that we get appropriate exceptions
+ * and exception message.
+ */
+public class TestConcurrentJobRequestsThreadsAndTimeout extends ConcurrentJobRequestsTestBase {
+
+ private static AppConfig config;
+ private static QueueStatusBean statusBean;
+ private static String statusTooManyRequestsExceptionMessage;
+ private static String listTooManyRequestsExceptionMessage;
+ private static String submitTooManyRequestsExceptionMessage;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @BeforeClass
+ public static void setUp() {
+ final String[] args = new String[] {};
+ Main main = new Main(args);
+ config = main.getAppConfigInstance();
+ config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5);
+ config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5);
+ config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5);
+ config.setInt(AppConfig.JOB_SUBMIT_TIMEOUT, 5);
+ config.setInt(AppConfig.JOB_STATUS_TIMEOUT, 5);
+ config.setInt(AppConfig.JOB_LIST_TIMEOUT, 5);
+ config.setInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 4);
+ config.setInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 1);
+ statusBean = new QueueStatusBean("job_1000", "Job not found");
+
+ statusTooManyRequestsExceptionMessage = "Unable to service the status job request as "
+ + "templeton service is busy with too many status job requests. "
+ + "Please wait for some time before retrying the operation. "
+ + "Please refer to the config templeton.parallellism.job.status "
+ + "to configure concurrent requests.";
+ listTooManyRequestsExceptionMessage = "Unable to service the list job request as "
+ + "templeton service is busy with too many list job requests. "
+ + "Please wait for some time before retrying the operation. "
+ + "Please refer to the config templeton.parallellism.job.list "
+ + "to configure concurrent requests.";
+ submitTooManyRequestsExceptionMessage = "Unable to service the submit job request as "
+ + "templeton service is busy with too many submit job requests. "
+ + "Please wait for some time before retrying the operation. "
+ + "Please refer to the config templeton.parallellism.job.submit "
+ + "to configure concurrent requests.";
+ }
+
+ @Test
+ public void ConcurrentJobsStatusTooManyRequestsException() {
+ try {
+ JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
+ statusJobHelper.getDelayedResonseAnswer(4, statusBean));
+ verifyTooManyRequestsException(jobRunnable.exception, this.statusTooManyRequestsExceptionMessage);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentListJobsTooManyRequestsException() {
+ try {
+ JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
+ listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
+ verifyTooManyRequestsException(jobRunnable.exception, this.listTooManyRequestsExceptionMessage);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentSubmitJobsTooManyRequestsException() {
+ try {
+ JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
+ submitJobHelper.getDelayedResonseAnswer(4, 0),
+ killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+ verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentJobsStatusTimeOutException() {
+ try {
+ JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+ statusJobHelper.getDelayedResonseAnswer(6, statusBean));
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof TimeoutException);
+ String expectedMessage = "Status job request got timed out. Please wait for some time before "
+ + "retrying the operation. Please refer to the config "
+ + "templeton.job.status.timeout to configure job request time out.";
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * Verify that new job requests should succeed with no issues.
+ */
+ jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+ statusJobHelper.getDelayedResonseAnswer(0, statusBean));
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentListJobsTimeOutException() {
+ try {
+ JobRunnable jobRunnable = ConcurrentListJobs(5, config, false, false,
+ listJobHelper.getDelayedResonseAnswer(6, new ArrayList<JobItemBean>()));
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof TimeoutException);
+ String expectedMessage = "List job request got timed out. Please wait for some time before "
+ + "retrying the operation. Please refer to the config "
+ + "templeton.job.list.timeout to configure job request time out.";
+
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * Verify that new job requests should succeed with no issues.
+ */
+ jobRunnable = ConcurrentListJobs(5, config, false, false,
+ listJobHelper.getDelayedResonseAnswer(1, new ArrayList<JobItemBean>()));
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentSubmitJobsTimeOutException() {
+ try {
+ JobRunnable jobRunnable = SubmitConcurrentJobs(5, config, false, false,
+ submitJobHelper.getDelayedResonseAnswer(6, 0),
+ killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof QueueException);
+ String expectedMessage = "Submit job request got timed out. Please wait for some time before "
+ + "retrying the operation. Please refer to the config "
+ + "templeton.job.submit.timeout to configure job request time out.";
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * For submit operation, tasks are not cancelled. Verify that new job request
+ * should fail with TooManyRequestsException.
+ */
+ jobRunnable = SubmitConcurrentJobs(1, config, false, false,
+ submitJobHelper.getDelayedResonseAnswer(0, 0),
+ killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+ verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage);
+
+ /*
+ * Sleep until all threads with clean up tasks are completed.
+ */
+ Thread.sleep(2000);
+
+ /*
+ * Now, tasks would have passed. Verify that new job requests should succeed with no issues.
+ */
+ jobRunnable = SubmitConcurrentJobs(5, config, false, false,
+ submitJobHelper.getDelayedResonseAnswer(0, 0),
+ killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentStatusJobsVerifyExceptions() {
+ try {
+ /*
+ * Trigger kill threads and verify we get InterruptedException and expected Message.
+ */
+ int timeoutTaskDelay = 4;
+ JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, true, false,
+ statusJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean));
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof InterruptedException);
+ String expectedMessage = "Status job request got interrupted. Please wait for some time before "
+ + "retrying the operation.";
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * Interrupt all thread and verify we get InterruptedException and expected Message.
+ */
+ jobRunnable = ConcurrentJobsStatus(5, config, false, true,
+ statusJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean));
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof InterruptedException);
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * Raise custom exception like IOException and verify expected Message.
+ */
+ jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+ statusJobHelper.getIOExceptionAnswer());
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception.getCause() instanceof IOException);
+
+ /*
+ * Now new job requests should succeed as status operation has no cancel threads.
+ */
+ jobRunnable = ConcurrentJobsStatus(5, config, false, false,
+ statusJobHelper.getDelayedResonseAnswer(0, statusBean));
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentListJobsVerifyExceptions() {
+ try {
+ /*
+ * Trigger kill threads and verify we get InterruptedException and expected Message.
+ */
+ int timeoutTaskDelay = 4;
+ JobRunnable jobRunnable = ConcurrentListJobs(5, config, true, false,
+ listJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, new ArrayList<JobItemBean>()));
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof InterruptedException);
+ String expectedMessage = "List job request got interrupted. Please wait for some time before "
+ + "retrying the operation.";
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * Interrupt all thread and verify we get InterruptedException and expected Message.
+ */
+ jobRunnable = ConcurrentListJobs(5, config, false, true,
+ listJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, new ArrayList<JobItemBean>()));
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof InterruptedException);
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * Raise custom exception like IOException and verify expected Message.
+ */
+ jobRunnable = ConcurrentListJobs(5, config, false, false,
+ listJobHelper.getIOExceptionAnswer());
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception.getCause() instanceof IOException);
+
+ /*
+ * Now new job requests should succeed as list operation has no cancel threads.
+ */
+ jobRunnable = ConcurrentListJobs(5, config, false, false,
+ listJobHelper.getDelayedResonseAnswer(0, new ArrayList<JobItemBean>()));
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ @Test
+ public void ConcurrentSubmitJobsVerifyExceptions() {
+ try {
+ int timeoutTaskDelay = 4;
+
+ /*
+ * Raise custom exception like IOException and verify expected Message.
+ * This should not invoke cancel operation.
+ */
+ JobRunnable jobRunnable = SubmitConcurrentJobs(1, config, false, false,
+ submitJobHelper.getIOExceptionAnswer(),
+ killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1002");
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof QueueException);
+ assertTrue(jobRunnable.exception.getMessage().contains("IOException raised manually."));
+
+ /*
+ * Raise custom exception like IOException and verify expected Message.
+ * This should not invoke cancel operation.
+ */
+ jobRunnable = SubmitConcurrentJobs(1, config, false, false,
+ submitJobHelper.getOutOfMemoryErrorAnswer(),
+ killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1003");
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof QueueException);
+ assertTrue(jobRunnable.exception.getMessage().contains("OutOfMemoryError raised manually."));
+
+ /*
+ * Trigger kill threads and verify that we get InterruptedException and expected
+ * Message. This should raise 3 kill operations and ensure that retries keep the time out
+ * occupied for 4 sec.
+ */
+ jobRunnable = SubmitConcurrentJobs(3, config, true, false,
+ submitJobHelper.getDelayedResonseAnswer(2, 0),
+ killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1000");
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof QueueException);
+ String expectedMessage = "Submit job request got interrupted. Please wait for some time "
+ + "before retrying the operation.";
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * Interrupt all threads and verify we get InterruptedException and expected
+ * Message. Also raise 2 kill operations and ensure that retries keep the time out
+ * occupied for 4 sec.
+ */
+ jobRunnable = SubmitConcurrentJobs(2, config, false, true,
+ submitJobHelper.getDelayedResonseAnswer(2, 0),
+ killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1001");
+ assertTrue(jobRunnable.exception != null);
+ assertTrue(jobRunnable.exception instanceof QueueException);
+ assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
+
+ /*
+ * For submit operation, tasks are not cancelled. Verify that new job request
+ * should fail with TooManyRequestsException.
+ */
+ jobRunnable = SubmitConcurrentJobs(1, config, false, false,
+ submitJobHelper.getDelayedResonseAnswer(0, 0),
+ killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1002");
+ verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage);
+
+ /*
+ * Sleep until all threads with clean up tasks are completed. 2 seconds completing task
+ * and 1 sec grace period.
+ */
+ Thread.sleep((timeoutTaskDelay + 2 + 1) * 1000);
+
+ /*
+ * Now new job requests should succeed as all cancel threads would have completed.
+ */
+ jobRunnable = SubmitConcurrentJobs(5, config, false, false,
+ submitJobHelper.getDelayedResonseAnswer(0, 0),
+ killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1004");
+ assertTrue(jobRunnable.exception == null);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+
+ private void verifyTooManyRequestsException(Throwable exception, String expectedMessage) {
+ assertTrue(exception != null);
+ assertTrue(exception instanceof TooManyRequestsException);
+ TooManyRequestsException ex = (TooManyRequestsException)exception;
+ assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
+ assertTrue(exception.getMessage().contains(expectedMessage));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hplsql/pom.xml
----------------------------------------------------------------------
diff --git a/hplsql/pom.xml b/hplsql/pom.xml
index d1337cb..44da8b2 100644
--- a/hplsql/pom.xml
+++ b/hplsql/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
----------------------------------------------------------------------
diff --git a/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java b/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
index 9c29eeb..4901e89 100644
--- a/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
+++ b/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
@@ -60,15 +60,7 @@ public class Udf extends GenericUDF {
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
if (exec == null) {
- exec = new Exec();
- String query = queryOI.getPrimitiveJavaObject(arguments[0].get());
- String[] args = { "-e", query, "-trace" };
- try {
- exec.setUdfRun(true);
- exec.init(args);
- } catch (Exception e) {
- throw new HiveException(e.getMessage());
- }
+ initExec(arguments);
}
if (arguments.length > 1) {
setParameters(arguments);
@@ -79,6 +71,22 @@ public class Udf extends GenericUDF {
}
return null;
}
+
+ /**
+ * init exec
+ */
+ public void initExec(DeferredObject[] arguments) throws HiveException {
+ exec = new Exec();
+ exec.enterGlobalScope();
+ String query = queryOI.getPrimitiveJavaObject(arguments[0].get());
+ String[] args = { "-e", query, "-trace" };
+ try {
+ exec.setUdfRun(true);
+ exec.init(args);
+ } catch (Exception e) {
+ throw new HiveException(e.getMessage());
+ }
+ }
/**
* Set parameters for the current call
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java
----------------------------------------------------------------------
diff --git a/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java b/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java
new file mode 100644
index 0000000..3896229
--- /dev/null
+++ b/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hplsql;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredJavaObject;
+
+public class TestHplsqlUdf {
+ StringObjectInspector queryOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
+ ObjectInspector argOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
+
+ /**
+ * test evaluate for exec init and setParameters
+ */
+ @Test
+ public void testEvaluateWithoutRun() throws HiveException {
+ // init udf
+ Udf udf = new Udf();
+ ObjectInspector[] initArguments = {queryOI, argOI};
+ udf.initialize(initArguments);
+ //set arguments
+ DeferredObject queryObj = new DeferredJavaObject("hello(:1)");
+ DeferredObject argObj = new DeferredJavaObject("name");
+ DeferredObject[] argumentsObj = {queryObj, argObj};
+
+ // init exec and set parameters, included
+ udf.initExec(argumentsObj);
+ udf.setParameters(argumentsObj);
+
+ // checking var exists and its value is right
+ Var var = udf.exec.findVariable(":1");
+ Assert.assertNotNull(var);
+ String val = (String) var.value;
+ Assert.assertEquals(val, "name");
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-serde/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-serde/pom.xml b/itests/custom-serde/pom.xml
index 166ffde..78b68c5 100644
--- a/itests/custom-serde/pom.xml
+++ b/itests/custom-serde/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/pom.xml b/itests/custom-udfs/pom.xml
index b230b41..de7df16 100644
--- a/itests/custom-udfs/pom.xml
+++ b/itests/custom-udfs/pom.xml
@@ -19,7 +19,7 @@ limitations under the License.
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-classloader-udf1/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-classloader-udf1/pom.xml b/itests/custom-udfs/udf-classloader-udf1/pom.xml
index 0a95c94..f863efd 100644
--- a/itests/custom-udfs/udf-classloader-udf1/pom.xml
+++ b/itests/custom-udfs/udf-classloader-udf1/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it-custom-udfs</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-classloader-udf2/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-classloader-udf2/pom.xml b/itests/custom-udfs/udf-classloader-udf2/pom.xml
index e3f30f1..2553f3e 100644
--- a/itests/custom-udfs/udf-classloader-udf2/pom.xml
+++ b/itests/custom-udfs/udf-classloader-udf2/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it-custom-udfs</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-classloader-util/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-classloader-util/pom.xml b/itests/custom-udfs/udf-classloader-util/pom.xml
index fe285d7..565a661 100644
--- a/itests/custom-udfs/udf-classloader-util/pom.xml
+++ b/itests/custom-udfs/udf-classloader-util/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it-custom-udfs</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/custom-udfs/udf-vectorized-badexample/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-vectorized-badexample/pom.xml b/itests/custom-udfs/udf-vectorized-badexample/pom.xml
index 35c1a2f..6dc923d 100644
--- a/itests/custom-udfs/udf-vectorized-badexample/pom.xml
+++ b/itests/custom-udfs/udf-vectorized-badexample/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it-custom-udfs</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/hcatalog-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml
index 3ef87f9..c157aed 100644
--- a/itests/hcatalog-unit/pom.xml
+++ b/itests/hcatalog-unit/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>3.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 8468b84..93ff498 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -915,6 +916,12 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
}
@Override
+ public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
+ String tableName) throws MetaException, NoSuchObjectException {
+ return objectStore.getAggrColStatsForTablePartitions(dbName, tableName);
+ }
+
+ @Override
@CanNotRetry
public Boolean commitTransactionExpectDeadlock() {
return null;