You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/08 20:43:31 UTC
[41/51] [partial] hive git commit: Revert "HIVE-14671 : merge master
into hive-14535 (Wei Zheng)"
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
index 663e4b6..aeb89df 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/PigDelegator.java
@@ -51,7 +51,7 @@ public class PigDelegator extends LauncherDelegator {
boolean usesHcatalog, String completedUrl, boolean enablelog,
Boolean enableJobReconnect)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- ExecuteException, IOException, InterruptedException, TooManyRequestsException {
+ ExecuteException, IOException, InterruptedException {
runAs = user;
List<String> args = makeArgs(execute,
srcFile, pigArgs,
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
index 793881b..5aed3b3 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SecureProxySupport.java
@@ -170,7 +170,6 @@ public class SecureProxySupport {
return null;
}
});
- FileSystem.closeAllForUGI(ugi);
return twrapper.tokens;
}
private static void collectTokens(FileSystem fs, TokenWrapper twrapper, Credentials creds, String userName) throws IOException {
@@ -205,7 +204,6 @@ public class SecureProxySupport {
return null;
}
});
- FileSystem.closeAllForUGI(ugi);
}
@@ -222,7 +220,6 @@ public class SecureProxySupport {
return client.getDelegationToken(c.getUser(), u);
}
});
- FileSystem.closeAllForUGI(ugi);
return s;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
index 43a7d57..2da0204 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/Server.java
@@ -27,8 +27,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.servlet.http.HttpServletRequest;
@@ -652,7 +650,7 @@ public class Server {
@FormParam("enablelog") boolean enablelog,
@FormParam("enablejobreconnect") Boolean enablejobreconnect)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- ExecuteException, IOException, InterruptedException, TooManyRequestsException {
+ ExecuteException, IOException, InterruptedException {
verifyUser();
verifyParam(inputs, "input");
verifyParam(mapper, "mapper");
@@ -706,7 +704,7 @@ public class Server {
@FormParam("enablelog") boolean enablelog,
@FormParam("enablejobreconnect") Boolean enablejobreconnect)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- ExecuteException, IOException, InterruptedException, TooManyRequestsException {
+ ExecuteException, IOException, InterruptedException {
verifyUser();
verifyParam(jar, "jar");
verifyParam(mainClass, "class");
@@ -756,7 +754,7 @@ public class Server {
@FormParam("enablelog") boolean enablelog,
@FormParam("enablejobreconnect") Boolean enablejobreconnect)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- ExecuteException, IOException, InterruptedException, TooManyRequestsException {
+ ExecuteException, IOException, InterruptedException {
verifyUser();
if (execute == null && srcFile == null) {
throw new BadParam("Either execute or file parameter required");
@@ -807,7 +805,7 @@ public class Server {
@FormParam("enablelog") boolean enablelog,
@FormParam("enablejobreconnect") Boolean enablejobreconnect)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- IOException, InterruptedException, TooManyRequestsException {
+ IOException, InterruptedException {
verifyUser();
if (command == null && optionsFile == null)
throw new BadParam("Must define Sqoop command or a optionsfile contains Sqoop command to run Sqoop job.");
@@ -861,7 +859,7 @@ public class Server {
@FormParam("enablelog") boolean enablelog,
@FormParam("enablejobreconnect") Boolean enablejobreconnect)
throws NotAuthorizedException, BusyException, BadParam, QueueException,
- ExecuteException, IOException, InterruptedException, TooManyRequestsException {
+ ExecuteException, IOException, InterruptedException {
verifyUser();
if (execute == null && srcFile == null) {
throw new BadParam("Either execute or file parameter required");
@@ -893,8 +891,7 @@ public class Server {
@Path("jobs/{jobid}")
@Produces({MediaType.APPLICATION_JSON})
public QueueStatusBean showJobId(@PathParam("jobid") String jobid)
- throws NotAuthorizedException, BadParam, IOException, InterruptedException,
- BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
+ throws NotAuthorizedException, BadParam, IOException, InterruptedException {
verifyUser();
verifyParam(jobid, ":jobid");
@@ -971,8 +968,7 @@ public class Server {
@QueryParam("showall") boolean showall,
@QueryParam("jobid") String jobid,
@QueryParam("numrecords") String numrecords)
- throws NotAuthorizedException, BadParam, IOException, InterruptedException,
- BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
+ throws NotAuthorizedException, BadParam, IOException, InterruptedException {
verifyUser();
@@ -984,14 +980,19 @@ public class Server {
showDetails = true;
}
+ ListDelegator ld = new ListDelegator(appConf);
+ List<String> list = ld.run(getDoAsUser(), showall);
+ List<JobItemBean> detailList = new ArrayList<JobItemBean>();
+ int currRecord = 0;
int numRecords;
+
// Parse numrecords to an integer
try {
if (numrecords != null) {
numRecords = Integer.parseInt(numrecords);
- if (numRecords <= 0) {
- throw new BadParam("numrecords should be an integer > 0");
- }
+ if (numRecords <= 0) {
+ throw new BadParam("numrecords should be an integer > 0");
+ }
}
else {
numRecords = -1;
@@ -1001,8 +1002,57 @@ public class Server {
throw new BadParam("Invalid numrecords format: numrecords should be an integer > 0");
}
- ListDelegator ld = new ListDelegator(appConf);
- return ld.run(getDoAsUser(), showall, jobid, numRecords, showDetails);
+ // Sort the list as requested
+ boolean isAscendingOrder = true;
+ switch (appConf.getListJobsOrder()) {
+ case lexicographicaldesc:
+ Collections.sort(list, Collections.reverseOrder());
+ isAscendingOrder = false;
+ break;
+ case lexicographicalasc:
+ default:
+ Collections.sort(list);
+ break;
+ }
+
+ for (String job : list) {
+ // If numRecords = -1, fetch all records.
+ // Hence skip all the below checks when numRecords = -1.
+ if (numRecords != -1) {
+ // If currRecord >= numRecords, we have already fetched the top #numRecords
+ if (currRecord >= numRecords) {
+ break;
+ }
+ else if (jobid == null || jobid.trim().length() == 0) {
+ currRecord++;
+ }
+ // If the current record needs to be returned based on the
+ // filter conditions specified by the user, increment the counter
+ else if (isAscendingOrder && job.compareTo(jobid) > 0 || !isAscendingOrder && job.compareTo(jobid) < 0) {
+ currRecord++;
+ }
+ // The current record should not be included in the output detailList.
+ else {
+ continue;
+ }
+ }
+ JobItemBean jobItem = new JobItemBean();
+ jobItem.id = job;
+ if (showDetails) {
+ StatusDelegator sd = new StatusDelegator(appConf);
+ try {
+ jobItem.detail = sd.run(getDoAsUser(), job);
+ }
+ catch(Exception ex) {
+ /*if we could not get status for some reason, log it, and send empty status back with
+ * just the ID so that caller knows to even look in the log file*/
+ LOG.info("Failed to get status detail for jobId='" + job + "'", ex);
+ jobItem.detail = new QueueStatusBean(job, "Failed to retrieve status; see WebHCat logs");
+ }
+ }
+ detailList.add(jobItem);
+ }
+ return detailList;
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
index eb84fb2..fde5f60 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/SqoopDelegator.java
@@ -50,7 +50,7 @@ public class SqoopDelegator extends LauncherDelegator {
String callback, String completedUrl, boolean enablelog,
Boolean enableJobReconnect, String libdir)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- IOException, InterruptedException, TooManyRequestsException
+ IOException, InterruptedException
{
if(TempletonUtils.isset(appConf.sqoopArchive())) {
if(!TempletonUtils.isset(appConf.sqoopPath()) && !TempletonUtils.isset(appConf.sqoopHome())) {
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
index c042ae8..fac0170 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StatusDelegator.java
@@ -19,13 +19,10 @@
package org.apache.hive.hcatalog.templeton;
import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.shims.HadoopShims.WebHCatJTShim;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.mapred.JobID;
@@ -44,78 +41,18 @@ import org.apache.hive.hcatalog.templeton.tool.JobState;
*/
public class StatusDelegator extends TempletonDelegator {
private static final Logger LOG = LoggerFactory.getLogger(StatusDelegator.class);
- private final String JOB_STATUS_EXECUTE_THREAD_PREFIX = "JobStatusExecute";
-
- /**
- * Current thread id used to set in execution threads.
- */
- private final String statusThreadId = Thread.currentThread().getName();
-
- /*
- * Job status request executor to get status of a job.
- */
- private static JobRequestExecutor<QueueStatusBean> jobRequest =
- new JobRequestExecutor<QueueStatusBean>(JobRequestExecutor.JobRequestType.Status,
- AppConfig.JOB_STATUS_MAX_THREADS, AppConfig.JOB_STATUS_TIMEOUT);
public StatusDelegator(AppConfig appConf) {
super(appConf);
}
- /*
- * Gets status of job form job id. If maximum concurrent job status requests are configured
- * then status request will be executed on a thread from thread pool. If job status request
- * time out is configured then request execution thread will be interrupted if thread
- * times out and does no action.
- */
- public QueueStatusBean run(final String user, final String id, boolean enableThreadPool)
- throws NotAuthorizedException, BadParam, IOException, InterruptedException,
- BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
- if (jobRequest.isThreadPoolEnabled() && enableThreadPool) {
- return jobRequest.execute(getJobStatusCallableTask(user, id));
- } else {
- return getJobStatus(user, id);
- }
- }
-
- /*
- * Job callable task for job status operation. Overrides behavior of execute() to get
- * status of a job. No need to override behavior of cleanup() as there is nothing to be
- * done if job sttaus operation is timed out or interrupted.
- */
- private JobCallable<QueueStatusBean> getJobStatusCallableTask(final String user,
- final String id) {
- return new JobCallable<QueueStatusBean>() {
- @Override
- public QueueStatusBean execute() throws NotAuthorizedException, BadParam, IOException,
- InterruptedException, BusyException {
- /*
- * Change the current thread name to include parent thread Id if it is executed
- * in thread pool. Useful to extract logs specific to a job request and helpful
- * to debug job issues.
- */
- Thread.currentThread().setName(String.format("%s-%s-%s", JOB_STATUS_EXECUTE_THREAD_PREFIX,
- statusThreadId, Thread.currentThread().getId()));
-
- return getJobStatus(user, id);
- }
- };
- }
-
- public QueueStatusBean run(final String user, final String id)
- throws NotAuthorizedException, BadParam, IOException, InterruptedException,
- BusyException, TimeoutException, ExecutionException, TooManyRequestsException {
- return run(user, id, true);
- }
-
- public QueueStatusBean getJobStatus(String user, String id)
+ public QueueStatusBean run(String user, String id)
throws NotAuthorizedException, BadParam, IOException, InterruptedException
{
WebHCatJTShim tracker = null;
JobState state = null;
- UserGroupInformation ugi = null;
try {
- ugi = UgiFactory.getUgi(user);
+ UserGroupInformation ugi = UgiFactory.getUgi(user);
tracker = ShimLoader.getHadoopShims().getWebHCatShim(appConf, ugi);
JobID jobid = StatusDelegator.StringToJobID(id);
if (jobid == null)
@@ -129,8 +66,6 @@ public class StatusDelegator extends TempletonDelegator {
tracker.close();
if (state != null)
state.close();
- if (ugi != null)
- FileSystem.closeAllForUGI(ugi);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
index 590e49f..839b56a 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/StreamingDelegator.java
@@ -51,7 +51,7 @@ public class StreamingDelegator extends LauncherDelegator {
Boolean enableJobReconnect,
JobType jobType)
throws NotAuthorizedException, BadParam, BusyException, QueueException,
- ExecuteException, IOException, InterruptedException, TooManyRequestsException {
+ ExecuteException, IOException, InterruptedException {
List<String> args = makeArgs(inputs, inputreader, output, mapper, reducer, combiner,
fileList, cmdenvs, jarArgs);
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
deleted file mode 100644
index 9d55ad4..0000000
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/TooManyRequestsException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.templeton;
-
-/**
- * Raise this exception if web service is busy with existing requests and not able
- * service new requests.
- */
-public class TooManyRequestsException extends SimpleWebException {
- /*
- * The current version of jetty server doesn't have the status
- * HttpStatus.TOO_MANY_REQUESTS_429. Hence, passing this as constant.
- */
- public static int TOO_MANY_REQUESTS_429 = 429;
-
- public TooManyRequestsException(String msg) {
- super(TOO_MANY_REQUESTS_429, msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
index f4c4b76..15ab8b9 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonControllerJob.java
@@ -81,14 +81,9 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi
this.appConf = conf;
}
- private Job job = null;
+ private JobID submittedJobId;
public String getSubmittedId() {
- if (job == null ) {
- return null;
- }
-
- JobID submittedJobId = job.getJobID();
if (submittedJobId == null) {
return null;
} else {
@@ -124,7 +119,7 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi
String user = UserGroupInformation.getCurrentUser().getShortUserName();
conf.set("user.name", user);
- job = new Job(conf);
+ Job job = new Job(conf);
job.setJarByClass(LaunchMapper.class);
job.setJobName(TempletonControllerJob.class.getSimpleName());
job.setMapperClass(LaunchMapper.class);
@@ -146,7 +141,7 @@ public class TempletonControllerJob extends Configured implements Tool, JobSubmi
job.submit();
- JobID submittedJobId = job.getJobID();
+ submittedJobId = job.getJobID();
if(metastoreTokenStrForm != null) {
//so that it can be cancelled later from CompleteDelegator
DelegationTokenCache.getStringFormTokenCache().storeDelegationToken(
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
index e0ccc70..07b005b 100644
--- a/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
+++ b/hcatalog/webhcat/svr/src/main/java/org/apache/hive/hcatalog/templeton/tool/TempletonUtils.java
@@ -362,7 +362,6 @@ public class TempletonUtils {
if (hadoopFsIsMissing(defaultFs, p))
throw new FileNotFoundException("File " + fname + " does not exist.");
- FileSystem.closeAllForUGI(ugi);
return p;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
deleted file mode 100644
index 5fcae46..0000000
--- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/ConcurrentJobRequestsTestBase.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.templeton;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.Future;
-
-import org.apache.hive.hcatalog.templeton.tool.TempletonControllerJob;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-
-/*
- * Base class for mocking job operations with concurrent requests.
- */
-public class ConcurrentJobRequestsTestBase {
- private static final Logger LOG = LoggerFactory.getLogger(ConcurrentJobRequestsTestBase.class);
- private boolean started = false;
- private Object lock = new Object();
-
- MockAnswerTestHelper<QueueStatusBean> statusJobHelper = new MockAnswerTestHelper<QueueStatusBean>();
- MockAnswerTestHelper<QueueStatusBean> killJobHelper = new MockAnswerTestHelper<QueueStatusBean>();
- MockAnswerTestHelper<List<JobItemBean>> listJobHelper = new MockAnswerTestHelper<List<JobItemBean>>();
- MockAnswerTestHelper<Integer> submitJobHelper = new MockAnswerTestHelper<Integer>();
-
- /*
- * Waits for other threads to join and returns with its Id.
- */
- private int waitForAllThreadsToStart(JobRunnable jobRunnable, int poolThreadCount) {
- int currentId = jobRunnable.threadStartCount.incrementAndGet();
- LOG.info("Waiting for other threads with thread id: " + currentId);
- synchronized(lock) {
- /*
- * We need a total of poolThreadCount + 1 threads to start at same. There are
- * poolThreadCount threads in thread pool and another one which has started them.
- * The thread which sees atomic counter as poolThreadCount+1 is the last thread`
- * to join and wake up all threads to start all at once.
- */
- if (currentId > poolThreadCount) {
- LOG.info("Waking up all threads: " + currentId);
- started = true;
- this.lock.notifyAll();
- } else {
- while (!started) {
- try {
- this.lock.wait();
- } catch (InterruptedException ignore) {
- }
- }
- }
- }
-
- return currentId;
- }
-
- public JobRunnable ConcurrentJobsStatus(final int threadCount, AppConfig appConfig,
- final boolean killThreads, boolean interruptThreads, final Answer<QueueStatusBean> answer)
- throws IOException, InterruptedException, QueueException, NotAuthorizedException,
- BadParam, BusyException {
-
- StatusDelegator delegator = new StatusDelegator(appConfig);
- final StatusDelegator mockDelegator = Mockito.spy(delegator);
-
- Mockito.doAnswer(answer).when(mockDelegator).getJobStatus(Mockito.any(String.class),
- Mockito.any(String.class));
-
- JobRunnable statusJobRunnable = new JobRunnable() {
- @Override
- public void run() {
- try {
- int threadId = waitForAllThreadsToStart(this, threadCount);
- LOG.info("Started executing Job Status operation. ThreadId : " + threadId);
- mockDelegator.run("admin", "job_1000" + threadId);
- } catch (Exception ex) {
- exception = ex;
- }
- }
- };
-
- executeJobOperations(statusJobRunnable, threadCount, killThreads, interruptThreads);
- return statusJobRunnable;
- }
-
- public JobRunnable ConcurrentListJobs(final int threadCount, AppConfig config,
- final boolean killThreads, boolean interruptThreads, final Answer<List<JobItemBean>> answer)
- throws IOException, InterruptedException, QueueException, NotAuthorizedException,
- BadParam, BusyException {
-
- ListDelegator delegator = new ListDelegator(config);
- final ListDelegator mockDelegator = Mockito.spy(delegator);
-
- Mockito.doAnswer(answer).when(mockDelegator).listJobs(Mockito.any(String.class),
- Mockito.any(boolean.class), Mockito.any(String.class),
- Mockito.any(int.class), Mockito.any(boolean.class));
-
- JobRunnable listJobRunnable = new JobRunnable() {
- @Override
- public void run() {
- try {
- int threadId = waitForAllThreadsToStart(this, threadCount);
- LOG.info("Started executing Job List operation. ThreadId : " + threadId);
- mockDelegator.run("admin", true, "", 10, true);
- } catch (Exception ex) {
- exception = ex;
- }
- }
- };
-
- executeJobOperations(listJobRunnable, threadCount, killThreads, interruptThreads);
- return listJobRunnable;
- }
-
- public JobRunnable SubmitConcurrentJobs(final int threadCount, AppConfig config,
- final boolean killThreads, boolean interruptThreads, final Answer<Integer> responseAnswer,
- final Answer<QueueStatusBean> timeoutResponseAnswer, final String jobIdResponse)
- throws IOException, InterruptedException, QueueException, NotAuthorizedException,
- BusyException, TimeoutException, Exception {
-
- LauncherDelegator delegator = new LauncherDelegator(config);
- final LauncherDelegator mockDelegator = Mockito.spy(delegator);
- final List<String> listArgs = new ArrayList<String>();
-
- TempletonControllerJob mockCtrl = Mockito.mock(TempletonControllerJob.class);
-
- Mockito.doReturn(jobIdResponse).when(mockCtrl).getSubmittedId();
-
- Mockito.doReturn(mockCtrl).when(mockDelegator).getTempletonController();
-
- Mockito.doAnswer(responseAnswer).when(mockDelegator).runTempletonControllerJob(
- Mockito.any(TempletonControllerJob.class), Mockito.any(List.class));
-
- Mockito.doAnswer(timeoutResponseAnswer).when(mockDelegator).killJob(
- Mockito.any(String.class), Mockito.any(String.class));
-
- Mockito.doNothing().when(mockDelegator).registerJob(Mockito.any(String.class),
- Mockito.any(String.class), Mockito.any(String.class), Mockito.any(Map.class));
-
- JobRunnable submitJobRunnable = new JobRunnable() {
- @Override
- public void run() {
- try {
- int threadId = waitForAllThreadsToStart(this, threadCount);
- LOG.info("Started executing Job Submit operation. ThreadId : " + threadId);
- mockDelegator.enqueueController("admin", null, "", listArgs);
- } catch (Throwable ex) {
- exception = ex;
- }
- }
- };
-
- executeJobOperations(submitJobRunnable, threadCount, killThreads, interruptThreads);
- return submitJobRunnable;
- }
-
- public void executeJobOperations(JobRunnable jobRunnable, int threadCount, boolean killThreads,
- boolean interruptThreads)
- throws IOException, InterruptedException, QueueException, NotAuthorizedException {
-
- started = false;
-
- ExecutorService executorService = new ThreadPoolExecutor(threadCount, threadCount, 0L,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());;
-
- ArrayList<Future<?>> futures = new ArrayList<Future<?>>();
- for (int i = 0; i < threadCount; i++) {
- futures.add(executorService.submit(jobRunnable));
- }
-
- waitForAllThreadsToStart(jobRunnable, threadCount);
- LOG.info("Started all threads ");
-
- if (killThreads) {
- executorService.shutdownNow();
- } else {
- if (interruptThreads){
- for (Future<?> future : futures) {
- LOG.info("Cancelling the thread");
- future.cancel(true);
- }
- }
-
- executorService.shutdown();
- }
-
- /*
- * For both graceful or forceful shutdown, wait for tasks to terminate such that
- * appropriate exceptions are raised and stored in JobRunnable.exception.
- */
- if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
- LOG.info("Force Shutting down the pool\n");
- if (!killThreads) {
- /*
- * killThreads option has already done force shutdown. No need to do again.
- */
- executorService.shutdownNow();
- }
- }
- }
-
- public abstract class JobRunnable implements Runnable {
- public volatile Throwable exception = null;
- public AtomicInteger threadStartCount = new AtomicInteger(0);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
deleted file mode 100644
index 9f1744e..0000000
--- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/MockAnswerTestHelper.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.templeton;
-
-import java.io.IOException;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/*
- * Helper class to generate mocked response.
- */
-public class MockAnswerTestHelper<T> {
- public Answer<T> getIOExceptionAnswer() {
- return new Answer<T>() {
- @Override
- public T answer(InvocationOnMock invocation) throws Exception {
- throw new IOException("IOException raised manually.");
- }
- };
- }
-
- public Answer<T> getOutOfMemoryErrorAnswer() {
- return new Answer<T>() {
- @Override
- public T answer(InvocationOnMock invocation) throws OutOfMemoryError {
- throw new OutOfMemoryError("OutOfMemoryError raised manually.");
- }
- };
- }
-
- public Answer<T> getDelayedResonseAnswer(final int delayInSeconds, final T response) {
- return new Answer<T>() {
- @Override
- public T answer(InvocationOnMock invocation) throws InterruptedException {
- Thread.sleep(1000 * delayInSeconds);
- return response;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
deleted file mode 100644
index 695dcc6..0000000
--- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequests.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.templeton;
-
-import java.util.ArrayList;
-
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import static org.junit.Assert.assertTrue;
-
-/*
- * Test submission of concurrent job requests.
- */
-public class TestConcurrentJobRequests extends ConcurrentJobRequestsTestBase {
-
- private static AppConfig config;
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
- @BeforeClass
- public static void setUp() {
- final String[] args = new String[] {};
- Main main = new Main(args);
- config = main.getAppConfigInstance();
- }
-
- @Test
- public void ConcurrentJobsStatusSuccess() {
- try {
- JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
- statusJobHelper.getDelayedResonseAnswer(4, new QueueStatusBean("job_1000", "Job not found")));
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentListJobsSuccess() {
- try {
- JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
- listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentSubmitJobsSuccess() {
- try {
- JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
- submitJobHelper.getDelayedResonseAnswer(4, 0),
- killJobHelper.getDelayedResonseAnswer(4, null), "job_1000");
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
deleted file mode 100644
index 6f8da40..0000000
--- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreads.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.templeton;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.TimeoutException;
-import org.eclipse.jetty.http.HttpStatus;
-
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import static org.junit.Assert.assertTrue;
-
-/*
- * Test submission of concurrent job requests with the controlled number of concurrent
- * Requests. Verify that we get busy exception and appropriate message.
- */
-public class TestConcurrentJobRequestsThreads extends ConcurrentJobRequestsTestBase {
-
- private static AppConfig config;
- private static QueueStatusBean statusBean;
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
- @BeforeClass
- public static void setUp() {
- final String[] args = new String[] {};
- Main main = new Main(args);
- config = main.getAppConfigInstance();
- config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5);
- config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5);
- config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5);
- statusBean = new QueueStatusBean("job_1000", "Job not found");
- }
-
- @Test
- public void ConcurrentJobsStatusTooManyRequestsException() {
- try {
- JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
- statusJobHelper.getDelayedResonseAnswer(4, statusBean));
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
- TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception;
- assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
- String expectedMessage = "Unable to service the status job request as templeton service is busy "
- + "with too many status job requests. Please wait for some time before "
- + "retrying the operation. Please refer to the config "
- + "templeton.parallellism.job.status to configure concurrent requests.";
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * Verify that new job requests have no issues.
- */
- jobRunnable = ConcurrentJobsStatus(5, config, false, false,
- statusJobHelper.getDelayedResonseAnswer(4, statusBean));
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentListJobsTooManyRequestsException() {
- try {
- JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
- listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
- TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception;
- assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
- String expectedMessage = "Unable to service the list job request as templeton service is busy "
- + "with too many list job requests. Please wait for some time before "
- + "retrying the operation. Please refer to the config "
- + "templeton.parallellism.job.list to configure concurrent requests.";
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * Verify that new job requests have no issues.
- */
- jobRunnable = ConcurrentListJobs(5, config, false, false,
- listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentSubmitJobsTooManyRequestsException() {
- try {
- JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
- submitJobHelper.getDelayedResonseAnswer(4, 0),
- killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof TooManyRequestsException);
- TooManyRequestsException ex = (TooManyRequestsException)jobRunnable.exception;
- assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
- String expectedMessage = "Unable to service the submit job request as templeton service is busy "
- + "with too many submit job requests. Please wait for some time before "
- + "retrying the operation. Please refer to the config "
- + "templeton.parallellism.job.submit to configure concurrent requests.";
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * Verify that new job requests have no issues.
- */
- jobRunnable = SubmitConcurrentJobs(5, config, false, false,
- submitJobHelper.getDelayedResonseAnswer(4, 0),
- killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java
----------------------------------------------------------------------
diff --git a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java b/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java
deleted file mode 100644
index ef49cbd..0000000
--- a/hcatalog/webhcat/svr/src/test/java/org/apache/hive/hcatalog/templeton/TestConcurrentJobRequestsThreadsAndTimeout.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hive.hcatalog.templeton;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.concurrent.TimeoutException;
-import org.eclipse.jetty.http.HttpStatus;
-
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import static org.junit.Assert.assertTrue;
-
-/*
- * Test submission of concurrent job requests with the controlled number of concurrent
- * Requests and job request execution time outs. Verify that we get appropriate exceptions
- * and exception message.
- */
-public class TestConcurrentJobRequestsThreadsAndTimeout extends ConcurrentJobRequestsTestBase {
-
- private static AppConfig config;
- private static QueueStatusBean statusBean;
- private static String statusTooManyRequestsExceptionMessage;
- private static String listTooManyRequestsExceptionMessage;
- private static String submitTooManyRequestsExceptionMessage;
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
- @BeforeClass
- public static void setUp() {
- final String[] args = new String[] {};
- Main main = new Main(args);
- config = main.getAppConfigInstance();
- config.setInt(AppConfig.JOB_STATUS_MAX_THREADS, 5);
- config.setInt(AppConfig.JOB_LIST_MAX_THREADS, 5);
- config.setInt(AppConfig.JOB_SUBMIT_MAX_THREADS, 5);
- config.setInt(AppConfig.JOB_SUBMIT_TIMEOUT, 5);
- config.setInt(AppConfig.JOB_STATUS_TIMEOUT, 5);
- config.setInt(AppConfig.JOB_LIST_TIMEOUT, 5);
- config.setInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_COUNT, 4);
- config.setInt(AppConfig.JOB_TIMEOUT_TASK_RETRY_INTERVAL, 1);
- statusBean = new QueueStatusBean("job_1000", "Job not found");
-
- statusTooManyRequestsExceptionMessage = "Unable to service the status job request as "
- + "templeton service is busy with too many status job requests. "
- + "Please wait for some time before retrying the operation. "
- + "Please refer to the config templeton.parallellism.job.status "
- + "to configure concurrent requests.";
- listTooManyRequestsExceptionMessage = "Unable to service the list job request as "
- + "templeton service is busy with too many list job requests. "
- + "Please wait for some time before retrying the operation. "
- + "Please refer to the config templeton.parallellism.job.list "
- + "to configure concurrent requests.";
- submitTooManyRequestsExceptionMessage = "Unable to service the submit job request as "
- + "templeton service is busy with too many submit job requests. "
- + "Please wait for some time before retrying the operation. "
- + "Please refer to the config templeton.parallellism.job.submit "
- + "to configure concurrent requests.";
- }
-
- @Test
- public void ConcurrentJobsStatusTooManyRequestsException() {
- try {
- JobRunnable jobRunnable = ConcurrentJobsStatus(6, config, false, false,
- statusJobHelper.getDelayedResonseAnswer(4, statusBean));
- verifyTooManyRequestsException(jobRunnable.exception, this.statusTooManyRequestsExceptionMessage);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentListJobsTooManyRequestsException() {
- try {
- JobRunnable jobRunnable = ConcurrentListJobs(6, config, false, false,
- listJobHelper.getDelayedResonseAnswer(4, new ArrayList<JobItemBean>()));
- verifyTooManyRequestsException(jobRunnable.exception, this.listTooManyRequestsExceptionMessage);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentSubmitJobsTooManyRequestsException() {
- try {
- JobRunnable jobRunnable = SubmitConcurrentJobs(6, config, false, false,
- submitJobHelper.getDelayedResonseAnswer(4, 0),
- killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
- verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentJobsStatusTimeOutException() {
- try {
- JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, false, false,
- statusJobHelper.getDelayedResonseAnswer(6, statusBean));
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof TimeoutException);
- String expectedMessage = "Status job request got timed out. Please wait for some time before "
- + "retrying the operation. Please refer to the config "
- + "templeton.job.status.timeout to configure job request time out.";
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * Verify that new job requests should succeed with no issues.
- */
- jobRunnable = ConcurrentJobsStatus(5, config, false, false,
- statusJobHelper.getDelayedResonseAnswer(0, statusBean));
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentListJobsTimeOutException() {
- try {
- JobRunnable jobRunnable = ConcurrentListJobs(5, config, false, false,
- listJobHelper.getDelayedResonseAnswer(6, new ArrayList<JobItemBean>()));
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof TimeoutException);
- String expectedMessage = "List job request got timed out. Please wait for some time before "
- + "retrying the operation. Please refer to the config "
- + "templeton.job.list.timeout to configure job request time out.";
-
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * Verify that new job requests should succeed with no issues.
- */
- jobRunnable = ConcurrentListJobs(5, config, false, false,
- listJobHelper.getDelayedResonseAnswer(1, new ArrayList<JobItemBean>()));
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentSubmitJobsTimeOutException() {
- try {
- JobRunnable jobRunnable = SubmitConcurrentJobs(5, config, false, false,
- submitJobHelper.getDelayedResonseAnswer(6, 0),
- killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof QueueException);
- String expectedMessage = "Submit job request got timed out. Please wait for some time before "
- + "retrying the operation. Please refer to the config "
- + "templeton.job.submit.timeout to configure job request time out.";
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * For submit operation, tasks are not cancelled. Verify that new job request
- * should fail with TooManyRequestsException.
- */
- jobRunnable = SubmitConcurrentJobs(1, config, false, false,
- submitJobHelper.getDelayedResonseAnswer(0, 0),
- killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
- verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage);
-
- /*
- * Sleep until all threads with clean up tasks are completed.
- */
- Thread.sleep(2000);
-
- /*
- * Now, tasks would have passed. Verify that new job requests should succeed with no issues.
- */
- jobRunnable = SubmitConcurrentJobs(5, config, false, false,
- submitJobHelper.getDelayedResonseAnswer(0, 0),
- killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1000");
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentStatusJobsVerifyExceptions() {
- try {
- /*
- * Trigger kill threads and verify we get InterruptedException and expected Message.
- */
- int timeoutTaskDelay = 4;
- JobRunnable jobRunnable = ConcurrentJobsStatus(5, config, true, false,
- statusJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean));
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof InterruptedException);
- String expectedMessage = "Status job request got interrupted. Please wait for some time before "
- + "retrying the operation.";
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * Interrupt all thread and verify we get InterruptedException and expected Message.
- */
- jobRunnable = ConcurrentJobsStatus(5, config, false, true,
- statusJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean));
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof InterruptedException);
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * Raise custom exception like IOException and verify expected Message.
- */
- jobRunnable = ConcurrentJobsStatus(5, config, false, false,
- statusJobHelper.getIOExceptionAnswer());
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception.getCause() instanceof IOException);
-
- /*
- * Now new job requests should succeed as status operation has no cancel threads.
- */
- jobRunnable = ConcurrentJobsStatus(5, config, false, false,
- statusJobHelper.getDelayedResonseAnswer(0, statusBean));
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentListJobsVerifyExceptions() {
- try {
- /*
- * Trigger kill threads and verify we get InterruptedException and expected Message.
- */
- int timeoutTaskDelay = 4;
- JobRunnable jobRunnable = ConcurrentListJobs(5, config, true, false,
- listJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, new ArrayList<JobItemBean>()));
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof InterruptedException);
- String expectedMessage = "List job request got interrupted. Please wait for some time before "
- + "retrying the operation.";
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * Interrupt all thread and verify we get InterruptedException and expected Message.
- */
- jobRunnable = ConcurrentListJobs(5, config, false, true,
- listJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, new ArrayList<JobItemBean>()));
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof InterruptedException);
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * Raise custom exception like IOException and verify expected Message.
- */
- jobRunnable = ConcurrentListJobs(5, config, false, false,
- listJobHelper.getIOExceptionAnswer());
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception.getCause() instanceof IOException);
-
- /*
- * Now new job requests should succeed as list operation has no cancel threads.
- */
- jobRunnable = ConcurrentListJobs(5, config, false, false,
- listJobHelper.getDelayedResonseAnswer(0, new ArrayList<JobItemBean>()));
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- @Test
- public void ConcurrentSubmitJobsVerifyExceptions() {
- try {
- int timeoutTaskDelay = 4;
-
- /*
- * Raise custom exception like IOException and verify expected Message.
- * This should not invoke cancel operation.
- */
- JobRunnable jobRunnable = SubmitConcurrentJobs(1, config, false, false,
- submitJobHelper.getIOExceptionAnswer(),
- killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1002");
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof QueueException);
- assertTrue(jobRunnable.exception.getMessage().contains("IOException raised manually."));
-
- /*
- * Raise custom exception like IOException and verify expected Message.
- * This should not invoke cancel operation.
- */
- jobRunnable = SubmitConcurrentJobs(1, config, false, false,
- submitJobHelper.getOutOfMemoryErrorAnswer(),
- killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1003");
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof QueueException);
- assertTrue(jobRunnable.exception.getMessage().contains("OutOfMemoryError raised manually."));
-
- /*
- * Trigger kill threads and verify that we get InterruptedException and expected
- * Message. This should raise 3 kill operations and ensure that retries keep the time out
- * occupied for 4 sec.
- */
- jobRunnable = SubmitConcurrentJobs(3, config, true, false,
- submitJobHelper.getDelayedResonseAnswer(2, 0),
- killJobHelper.getDelayedResonseAnswer(timeoutTaskDelay, statusBean), "job_1000");
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof QueueException);
- String expectedMessage = "Submit job request got interrupted. Please wait for some time "
- + "before retrying the operation.";
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * Interrupt all threads and verify we get InterruptedException and expected
- * Message. Also raise 2 kill operations and ensure that retries keep the time out
- * occupied for 4 sec.
- */
- jobRunnable = SubmitConcurrentJobs(2, config, false, true,
- submitJobHelper.getDelayedResonseAnswer(2, 0),
- killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1001");
- assertTrue(jobRunnable.exception != null);
- assertTrue(jobRunnable.exception instanceof QueueException);
- assertTrue(jobRunnable.exception.getMessage().contains(expectedMessage));
-
- /*
- * For submit operation, tasks are not cancelled. Verify that new job request
- * should fail with TooManyRequestsException.
- */
- jobRunnable = SubmitConcurrentJobs(1, config, false, false,
- submitJobHelper.getDelayedResonseAnswer(0, 0),
- killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1002");
- verifyTooManyRequestsException(jobRunnable.exception, this.submitTooManyRequestsExceptionMessage);
-
- /*
- * Sleep until all threads with clean up tasks are completed. 2 seconds completing task
- * and 1 sec grace period.
- */
- Thread.sleep((timeoutTaskDelay + 2 + 1) * 1000);
-
- /*
- * Now new job requests should succeed as all cancel threads would have completed.
- */
- jobRunnable = SubmitConcurrentJobs(5, config, false, false,
- submitJobHelper.getDelayedResonseAnswer(0, 0),
- killJobHelper.getDelayedResonseAnswer(0, statusBean), "job_1004");
- assertTrue(jobRunnable.exception == null);
- } catch (Exception e) {
- assertTrue(false);
- }
- }
-
- private void verifyTooManyRequestsException(Throwable exception, String expectedMessage) {
- assertTrue(exception != null);
- assertTrue(exception instanceof TooManyRequestsException);
- TooManyRequestsException ex = (TooManyRequestsException)exception;
- assertTrue(ex.httpCode == TooManyRequestsException.TOO_MANY_REQUESTS_429);
- assertTrue(exception.getMessage().contains(expectedMessage));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hplsql/pom.xml
----------------------------------------------------------------------
diff --git a/hplsql/pom.xml b/hplsql/pom.xml
index 44da8b2..d1337cb 100644
--- a/hplsql/pom.xml
+++ b/hplsql/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
----------------------------------------------------------------------
diff --git a/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java b/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
index 4901e89..9c29eeb 100644
--- a/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
+++ b/hplsql/src/main/java/org/apache/hive/hplsql/Udf.java
@@ -60,7 +60,15 @@ public class Udf extends GenericUDF {
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
if (exec == null) {
- initExec(arguments);
+ exec = new Exec();
+ String query = queryOI.getPrimitiveJavaObject(arguments[0].get());
+ String[] args = { "-e", query, "-trace" };
+ try {
+ exec.setUdfRun(true);
+ exec.init(args);
+ } catch (Exception e) {
+ throw new HiveException(e.getMessage());
+ }
}
if (arguments.length > 1) {
setParameters(arguments);
@@ -71,22 +79,6 @@ public class Udf extends GenericUDF {
}
return null;
}
-
- /**
- * init exec
- */
- public void initExec(DeferredObject[] arguments) throws HiveException {
- exec = new Exec();
- exec.enterGlobalScope();
- String query = queryOI.getPrimitiveJavaObject(arguments[0].get());
- String[] args = { "-e", query, "-trace" };
- try {
- exec.setUdfRun(true);
- exec.init(args);
- } catch (Exception e) {
- throw new HiveException(e.getMessage());
- }
- }
/**
* Set parameters for the current call
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java
----------------------------------------------------------------------
diff --git a/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java b/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java
deleted file mode 100644
index 3896229..0000000
--- a/hplsql/src/test/java/org/apache/hive/hplsql/TestHplsqlUdf.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hive.hplsql;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredJavaObject;
-
-public class TestHplsqlUdf {
- StringObjectInspector queryOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
- ObjectInspector argOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
-
- /**
- * test evaluate for exec init and setParameters
- */
- @Test
- public void testEvaluateWithoutRun() throws HiveException {
- // init udf
- Udf udf = new Udf();
- ObjectInspector[] initArguments = {queryOI, argOI};
- udf.initialize(initArguments);
- //set arguments
- DeferredObject queryObj = new DeferredJavaObject("hello(:1)");
- DeferredObject argObj = new DeferredJavaObject("name");
- DeferredObject[] argumentsObj = {queryObj, argObj};
-
- // init exec and set parameters, included
- udf.initExec(argumentsObj);
- udf.setParameters(argumentsObj);
-
- // checking var exists and its value is right
- Var var = udf.exec.findVariable(":1");
- Assert.assertNotNull(var);
- String val = (String) var.value;
- Assert.assertEquals(val, "name");
- }
-}
-
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/itests/custom-serde/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-serde/pom.xml b/itests/custom-serde/pom.xml
index 78b68c5..166ffde 100644
--- a/itests/custom-serde/pom.xml
+++ b/itests/custom-serde/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/itests/custom-udfs/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/pom.xml b/itests/custom-udfs/pom.xml
index de7df16..b230b41 100644
--- a/itests/custom-udfs/pom.xml
+++ b/itests/custom-udfs/pom.xml
@@ -19,7 +19,7 @@ limitations under the License.
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/itests/custom-udfs/udf-classloader-udf1/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-classloader-udf1/pom.xml b/itests/custom-udfs/udf-classloader-udf1/pom.xml
index f863efd..0a95c94 100644
--- a/itests/custom-udfs/udf-classloader-udf1/pom.xml
+++ b/itests/custom-udfs/udf-classloader-udf1/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it-custom-udfs</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/itests/custom-udfs/udf-classloader-udf2/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-classloader-udf2/pom.xml b/itests/custom-udfs/udf-classloader-udf2/pom.xml
index 2553f3e..e3f30f1 100644
--- a/itests/custom-udfs/udf-classloader-udf2/pom.xml
+++ b/itests/custom-udfs/udf-classloader-udf2/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it-custom-udfs</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/itests/custom-udfs/udf-classloader-util/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-classloader-util/pom.xml b/itests/custom-udfs/udf-classloader-util/pom.xml
index 565a661..fe285d7 100644
--- a/itests/custom-udfs/udf-classloader-util/pom.xml
+++ b/itests/custom-udfs/udf-classloader-util/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it-custom-udfs</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/itests/custom-udfs/udf-vectorized-badexample/pom.xml
----------------------------------------------------------------------
diff --git a/itests/custom-udfs/udf-vectorized-badexample/pom.xml b/itests/custom-udfs/udf-vectorized-badexample/pom.xml
index 6dc923d..35c1a2f 100644
--- a/itests/custom-udfs/udf-vectorized-badexample/pom.xml
+++ b/itests/custom-udfs/udf-vectorized-badexample/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it-custom-udfs</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/itests/hcatalog-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/pom.xml b/itests/hcatalog-unit/pom.xml
index c157aed..3ef87f9 100644
--- a/itests/hcatalog-unit/pom.xml
+++ b/itests/hcatalog-unit/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it</artifactId>
- <version>3.0.0-SNAPSHOT</version>
+ <version>2.2.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/hive/blob/ed64a74e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index 93ff498..8468b84 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -916,12 +915,6 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
}
@Override
- public Map<String, ColumnStatisticsObj> getAggrColStatsForTablePartitions(String dbName,
- String tableName) throws MetaException, NoSuchObjectException {
- return objectStore.getAggrColStatsForTablePartitions(dbName, tableName);
- }
-
- @Override
@CanNotRetry
public Boolean commitTransactionExpectDeadlock() {
return null;