You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by pr...@apache.org on 2017/02/28 12:48:20 UTC
[2/8] lens git commit: LENS-1389: Back Merge with master and fix
lens-cube tests
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITThrottlingTests.java
----------------------------------------------------------------------
diff --git a/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITThrottlingTests.java b/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITThrottlingTests.java
new file mode 100644
index 0000000..6e34cb5
--- /dev/null
+++ b/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITThrottlingTests.java
@@ -0,0 +1,605 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.regression.throttling;
+
+import java.lang.reflect.Method;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.WebTarget;
+
+import org.apache.lens.api.Priority;
+import org.apache.lens.api.query.LensQuery;
+import org.apache.lens.api.query.QueryHandle;
+import org.apache.lens.api.query.QueryStatus;
+import org.apache.lens.cube.parse.CubeQueryConfUtil;
+import org.apache.lens.driver.hive.HiveDriver;
+import org.apache.lens.regression.core.constants.DriverConfig;
+import org.apache.lens.regression.core.constants.QueryInventory;
+import org.apache.lens.regression.core.helpers.ServiceManagerHelper;
+import org.apache.lens.regression.core.testHelper.BaseTestClass;
+import org.apache.lens.regression.util.Util;
+import org.apache.lens.server.api.LensConfConstants;
+import org.apache.lens.server.api.util.LensUtil;
+
+import org.apache.log4j.Logger;
+
+import org.testng.Assert;
+import org.testng.annotations.*;
+
+public class ITThrottlingTests extends BaseTestClass {
+
+ WebTarget servLens;
+ String sessionHandleString;
+
+ public static final String SLEEP_QUERY = QueryInventory.getSleepQuery("5");
+ public static final String COST_95 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "5");
+ public static final String COST_60 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_60"), "5");
+ public static final String COST_30 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_30"), "5");
+ public static final String COST_20 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_20"), "4");
+ public static final String COST_10 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_10"), "4");
+ public static final String COST_5 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_5"), "3");
+ public static final String JDBC_QUERY1 = QueryInventory.getQueryFromInventory("JDBC.QUERY1");
+
+ private static String hiveDriver = "hive/hive1";
+ private final String hiveDriverConf = lens.getServerDir() + "/conf/drivers/hive/hive1/hivedriver-site.xml";
+ private final String backupConfFilePath = lens.getServerDir() + "/conf/drivers/hive/hive1/backup-hivedriver-site.xml";
+
+ private static final long SECONDS_IN_A_MINUTE = 60;
+ private String session1 = null, session2 = null;
+ //TODO : Read queue names from property file
+ private static String queue1 = "dwh", queue2 = "reports";
+
+ private static Logger logger = Logger.getLogger(ITThrottlingTests.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void initialize() throws Exception {
+ servLens = ServiceManagerHelper.init();
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "10",
+ HiveDriver.HS2_PRIORITY_RANGES, "HIGH,7,NORMAL,30,LOW,90,VERY_LOW");
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp(Method method) throws Exception {
+ logger.info("Test Name: " + method.getName());
+ Util.runRemoteCommand("cp " + hiveDriverConf + " " + backupConfFilePath);
+
+ sessionHandleString = sHelper.openSession(lens.getCurrentDB());
+ session1 = sHelper.openSession("diff1", "diff1", lens.getCurrentDB());
+ session2 = sHelper.openSession("diff2", "diff2", lens.getCurrentDB());
+ sHelper.setAndValidateParam(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false");
+ sHelper.setAndValidateParam(session1, CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false");
+ sHelper.setAndValidateParam(session2, CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false");
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void afterMethod(Method method) throws Exception {
+ logger.info("Test Name: " + method.getName());
+ qHelper.killQuery(null, "QUEUED", "all");
+ qHelper.killQuery(null, "RUNNING", "all");
+ qHelper.killQuery(null, "EXECUTED", "all");
+ sHelper.closeSession(session1);
+ sHelper.closeSession(session2);
+ sHelper.closeSession(sessionHandleString);
+
+ Util.runRemoteCommand("cp " + backupConfFilePath + " " + hiveDriverConf);
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void closeSession() throws Exception {
+ lens.restart();
+ }
+
+
+ @Test(enabled = true)
+ public void testHiveThrottling() throws Exception {
+
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "2");
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+
+ List<QueryHandle> handleList = new ArrayList<>();
+ handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null, session1).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null, session2).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null).getData());
+
+ Thread.sleep(1000);
+
+ List<QueryStatus> statusList = new ArrayList<>();
+ for(QueryHandle handle : handleList){
+ statusList.add(qHelper.getQueryStatus(handle));
+ }
+
+ Assert.assertEquals(statusList.get(0).getStatus(), QueryStatus.Status.RUNNING);
+ Assert.assertEquals(statusList.get(1).getStatus(), QueryStatus.Status.RUNNING);
+ Assert.assertEquals(statusList.get(2).getStatus(), QueryStatus.Status.QUEUED);
+ Assert.assertEquals(statusList.get(3).getStatus(), QueryStatus.Status.QUEUED);
+
+ qHelper.waitForCompletion(handleList.get(0));
+ Thread.sleep(100);
+ Assert.assertEquals(qHelper.getQueryStatus(handleList.get(2)).getStatus(), QueryStatus.Status.RUNNING);
+ }
+
+
+ @Test(enabled = true)
+ public void testHiveMaxConcurrentRandomQueryIngestion() throws Exception {
+
+ long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
+ int sleepTime = 3, maxConcurrent = 4;
+ List<QueryHandle> handleList = new ArrayList<QueryHandle>();
+
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+ String.valueOf(maxConcurrent));
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+
+ for (int i = 0; i < 5; i++) {
+ handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(JDBC_QUERY1).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY, null, session1).getData());
+ }
+
+ Thread.sleep(50);
+
+ List<QueryHandle> running = null, queued = null;
+ for (int t = 0; t < timeToWait; t = t + sleepTime) {
+
+ running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver);
+ queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver);
+ logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size());
+
+ if (running.isEmpty() && queued.isEmpty()) {
+ break;
+ }
+ Assert.assertTrue(running.size() <= maxConcurrent);
+
+ if (t % 30 == 0 && t < 200) {
+ handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_DIM_QUERY).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.JDBC_CUBE_QUERY).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY, null, session1).getData());
+ }
+ TimeUnit.SECONDS.sleep(sleepTime);
+ }
+
+ Assert.assertTrue(running.isEmpty());
+ Assert.assertTrue(queued.isEmpty());
+
+ for(QueryHandle q : handleList){
+ LensQuery lq = qHelper.waitForCompletion(q);
+ Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL);
+ }
+ }
+
+ @Test(enabled = true)
+ public void testProrityMaxConcurrent() throws Exception {
+
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5",
+ DriverConfig.PRIORITY_MAX_CONCURRENT, "HIGH=2,VERY_LOW=1", HiveDriver.HS2_PRIORITY_RANGES,
+ "HIGH,7,NORMAL,30,LOW,90,VERY_LOW");
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+
+ /* First 3 are high priority queries, 2 of them will go to RUNNING, 3rd will be queued as there is a
+ threshold of 2 on HIGH priority. cost_95 queries are very_low priority ones, hence 1st will go to RUNNING,
+ other 1 is queued. cost_20 and cost_60 goes to running as they are normal and low priority queries and there
+ is no limit set for low and normal priority.Last cost_60 query goes to queue as, RUNNING query on this
+ driver has reached max concurrent threshold.
+ */
+
+ String[] queries = {COST_5, COST_5, COST_5, COST_95, COST_95, COST_20, COST_60, COST_60};
+ QueryStatus.Status[] expectedStatus = {QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING,
+ QueryStatus.Status.QUEUED, QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED,
+ QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, };
+
+ List<QueryHandle> handleList = new ArrayList<>();
+ for (String query : queries){
+ handleList.add((QueryHandle) qHelper.executeQuery(query).getData());
+ }
+
+ List<QueryStatus.Status> statusList = new ArrayList<>();
+ for (QueryHandle handle : handleList){
+ statusList.add(qHelper.getQueryStatus(handle).getStatus());
+ }
+
+ for (int i=0; i<statusList.size(); i++){
+ Assert.assertEquals(statusList.get(i), expectedStatus[i]);
+ }
+
+ qHelper.waitForCompletion(handleList.get(0));
+ Assert.assertEquals(qHelper.getQueryStatus(handleList.get(2)).getStatus(), QueryStatus.Status.RUNNING);
+ }
+
+ @Test(enabled = true)
+ public void prioritySumMoreThanMaxConcurrent() throws Exception {
+
+ long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
+ int sleepTime = 5, maxConcurrent = 5, lowConCurrent = 2, veryLowConcurrent = 1, highConcurrent = 4,
+ normalConcurrent = 2;
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+ String.valueOf(maxConcurrent), DriverConfig.PRIORITY_MAX_CONCURRENT,
+ "LOW=" + String.valueOf(lowConCurrent) + ",VERY_LOW=" + String.valueOf(veryLowConcurrent)
+ + ",NORMAL=" + String.valueOf(normalConcurrent) + ",HIGH=" + String.valueOf(highConcurrent));
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+
+ List<QueryHandle> handleList = new ArrayList<QueryHandle>();
+ for (int i=1; i<=5; i++){
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_60, null, session1).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session2).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_5).getData());
+ }
+
+ List<QueryHandle> running=null, queued=null;
+ for (int t = 0; t < timeToWait; t = t + sleepTime) {
+
+ running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver);
+ queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver);
+ logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size());
+
+ if (running.isEmpty() && queued.isEmpty()) {
+ break;
+ }
+
+ Assert.assertTrue(running.size() <= maxConcurrent);
+
+ int low = 0, veryLow = 0, high = 0, normal = 0;
+ for (QueryHandle qh : running) {
+ Priority p = qHelper.getLensQuery(sessionHandleString, qh).getPriority();
+ Assert.assertNotNull(p);
+ switch (p) {
+ case HIGH:
+ high++;
+ break;
+ case NORMAL:
+ normal++;
+ break;
+ case LOW:
+ low++;
+ break;
+ case VERY_LOW:
+ veryLow++;
+ break;
+ default:
+ throw new Exception("Unexpected Priority");
+ }
+ }
+
+ Assert.assertTrue(low <= lowConCurrent);
+ Assert.assertTrue(veryLow <= veryLowConcurrent);
+ Assert.assertTrue(high <= highConcurrent);
+ Assert.assertTrue(normal <= normalConcurrent);
+
+ TimeUnit.SECONDS.sleep(sleepTime);
+ }
+
+ Assert.assertTrue(queued.isEmpty());
+ Assert.assertTrue(running.isEmpty());
+
+ for (QueryHandle q: handleList){
+ LensQuery lq = qHelper.waitForCompletion(q);
+ Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL);
+ }
+ }
+
+
+ @Test(enabled = true)
+ public void queueMaxConcurrent() throws Exception {
+
+ int maxConcurrent = 3, queue1Count = 1, queue2Count = 2;
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+ String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT,
+ queue1 + "=" + String.valueOf(queue1Count) + "," + queue2 + "=" + String.valueOf(queue2Count));
+ List<QueryHandle> handleList = new ArrayList<>();
+
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+
+ sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+
+ sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+
+ List<QueryStatus.Status> statusList = new ArrayList<>();
+ for (QueryHandle handle : handleList){
+ statusList.add(qHelper.getQueryStatus(handle).getStatus());
+ }
+
+ Assert.assertEquals(statusList.get(0), QueryStatus.Status.RUNNING);
+ Assert.assertEquals(statusList.get(1), QueryStatus.Status.QUEUED);
+ Assert.assertEquals(statusList.get(2), QueryStatus.Status.RUNNING);
+ Assert.assertEquals(statusList.get(3), QueryStatus.Status.RUNNING);
+ Assert.assertEquals(statusList.get(4), QueryStatus.Status.QUEUED);
+
+ qHelper.waitForCompletion(handleList.get(0));
+ Assert.assertEquals(qHelper.getQueryStatus(handleList.get(1)).getStatus(), QueryStatus.Status.RUNNING);
+
+ qHelper.waitForCompletion(handleList.get(2));
+ Assert.assertEquals(qHelper.getQueryStatus(handleList.get(4)).getStatus(), QueryStatus.Status.RUNNING);
+ }
+
+ // LENS-1027
+ @Test(enabled = true)
+ public void queueDefaultThresholdConstraint() throws Exception {
+
+ int maxConcurrent = 5, queue1Count = 1, queue2Count = 2;
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+ String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT,
+ "*=" + String.valueOf(queue1Count) + "," + queue2 + "=" + String.valueOf(queue2Count));
+ List<QueryHandle> handleList = new ArrayList<>();
+
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+
+ sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+
+ sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+
+ Thread.sleep(2000);
+
+ List<QueryStatus.Status> statusList = new ArrayList<>();
+ for (QueryHandle handle : handleList){
+ statusList.add(qHelper.getQueryStatus(handle).getStatus());
+ }
+
+ Assert.assertEquals(statusList.get(0), QueryStatus.Status.RUNNING);
+ Assert.assertEquals(statusList.get(1), QueryStatus.Status.QUEUED);
+ Assert.assertEquals(statusList.get(2), QueryStatus.Status.RUNNING);
+ Assert.assertEquals(statusList.get(3), QueryStatus.Status.RUNNING);
+ Assert.assertEquals(statusList.get(4), QueryStatus.Status.QUEUED);
+ }
+
+
+ @Test(enabled = true)
+ public void enableQueueThrottlingWithExistingQueuedQueries() throws Exception {
+
+ long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
+ int sleepTime = 5, maxConcurrent = 4, queue1Concurrent = 1, queue2Concurrent = 2;
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+ String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT,
+ queue1 + "=" + String.valueOf(queue1Concurrent) + "," + queue2 + "=" + String.valueOf(queue2Concurrent));
+
+ sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
+ sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
+
+ List<QueryHandle> handleList = new ArrayList<QueryHandle>();
+ for (int i = 1; i <= 3; i++) {
+ handleList.add((QueryHandle)qHelper.executeQuery(COST_95).getData());
+ handleList.add((QueryHandle)qHelper.executeQuery(COST_20, "", session1).getData());
+ handleList.add((QueryHandle)qHelper.executeQuery(COST_95, "", session2).getData());
+ }
+
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+
+ for (int i = 1; i <= 2; i++) {
+ handleList.add((QueryHandle)qHelper.executeQuery(COST_95).getData());
+ handleList.add((QueryHandle)qHelper.executeQuery(COST_20, "", session1).getData());
+ handleList.add((QueryHandle)qHelper.executeQuery(COST_95, "", session2).getData());
+ }
+
+ List<QueryHandle> running = null, queued = null;
+ for (int t = 0; t < timeToWait; t = t + sleepTime) {
+
+ running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver);
+ queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver);
+ logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size());
+
+ if (running.isEmpty() && queued.isEmpty()) {
+ break;
+ }
+ Assert.assertTrue(running.size() <= maxConcurrent, "running query count is not less than max concurrent set"
+ + "running-count : " + running.size() + ", max-count : " + maxConcurrent);
+
+ int queue1Count = 0, queue2Count = 0;
+ for (QueryHandle qh : running) {
+ String queue = qHelper.getLensQuery(sessionHandleString, qh).getQueryConf().getProperties()
+ .get("mapreduce.job.queuename");
+ Assert.assertNotNull(queue);
+
+ if (queue.equals(queue1)) {
+ queue1Count++;
+ } else if (queue.equals(queue2)) {
+ queue2Count++;
+ }
+ }
+
+ Assert.assertTrue(queue1Count <= queue1Concurrent, "queue1 count : " + queue1Count);
+ Assert.assertTrue(queue2Count <= queue2Concurrent, "queue2 count : " + queue2Count);
+ TimeUnit.SECONDS.sleep(sleepTime);
+ }
+
+ Assert.assertTrue(running.isEmpty());
+ Assert.assertTrue(queued.isEmpty());
+
+ for(QueryHandle q: handleList){
+ LensQuery lq = qHelper.waitForCompletion(q);
+ Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL);
+ }
+ }
+
+
+ @Test(enabled = true)
+ public void queueAndPriorityMaxConcurrent() throws Exception {
+
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5",
+ DriverConfig.PRIORITY_MAX_CONCURRENT, "LOW=2,VERY_LOW=1",
+ DriverConfig.QUEUE_MAX_CONCURRENT, queue1 + "=1," + queue2 + "=2");
+
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+
+ sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
+ sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
+
+ QueryStatus.Status[] expectedStatus = {QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED,
+ QueryStatus.Status.QUEUED, QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING,
+ QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, QueryStatus.Status.QUEUED, };
+
+ List<QueryHandle> handleList = new ArrayList<>();
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_95, null, session1).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session1).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_95, null, session2).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_60, null, session2).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_5, null, session2).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session2).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_5, null, session2).getData());
+
+ List<QueryStatus> statusList = new ArrayList<>();
+ for(QueryHandle handle: handleList){
+ statusList.add(qHelper.getQueryStatus(handle));
+ }
+
+ for(int i=0; i<expectedStatus.length; i++){
+ Assert.assertEquals(statusList.get(i).getStatus(), expectedStatus[i], "failed : query-" + i);
+ }
+ }
+
+
+ @Test(enabled = true)
+ public void queueAndPriorityMaxConcurrentMany() throws Exception {
+
+ long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
+ int sleepTime = 5, maxConcurrent = 5, queue1Concurrent = 1, queue2Concurrent = 2, priority1 = 2, priority2 = 1;
+
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+ String.valueOf(maxConcurrent), DriverConfig.PRIORITY_MAX_CONCURRENT,
+ "HIGH=" + String.valueOf(priority1) + ",NORMAL=" + String.valueOf(priority2),
+ DriverConfig.QUEUE_MAX_CONCURRENT,
+ queue1 + "=" + String.valueOf(queue1Concurrent) + "," + queue2 + "=" + String.valueOf(queue2Concurrent));
+
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+
+ sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
+ sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
+
+ List<QueryHandle> handleList = new ArrayList<QueryHandle>();
+ for (int i = 1; i <= 3; i++) {
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_5).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_20, "", session1).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_60, "", session2).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_20).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_95, "", session1).getData());
+ handleList.add((QueryHandle) qHelper.executeQuery(COST_5, "", session2).getData());
+ }
+
+ List<QueryHandle> running = null, queued = null;
+ for (int t = 0; t < timeToWait; t = t + sleepTime) {
+
+ running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver);
+ queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver);
+ logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size());
+
+ if (running.isEmpty() && queued.isEmpty()) {
+ break;
+ }
+
+ Assert.assertTrue(running.size() <= maxConcurrent);
+
+ int pCount1 = 0, pCount2 = 0, queue1Count = 0, queue2Count = 0;
+ for (QueryHandle qh : running) {
+ Priority priority = qHelper.getLensQuery(sessionHandleString, qh).getPriority();
+ String queue = qHelper.getLensQuery(sessionHandleString, qh).getQueryConf().getProperties()
+ .get("mapreduce.job.queuename");
+
+ Assert.assertNotNull(priority);
+ Assert.assertNotNull(queue);
+
+ if (priority.equals(Priority.LOW)){
+ pCount1++;
+ } else if (priority.equals(Priority.VERY_LOW)){
+ pCount2++;
+ }
+
+ if (queue.equals(queue1)){
+ queue1Count++;
+ } else if (queue.equals(queue2)) {
+ queue2Count++;
+ }
+ }
+
+ Assert.assertTrue(pCount1 <= priority1, "proirty-1 count : " + pCount1);
+ Assert.assertTrue(pCount2 <= priority2, "priority-2 count : " + pCount2);
+ Assert.assertTrue(queue1Count <= queue1Concurrent, "queue-1 count : " + queue1Count);
+ Assert.assertTrue(queue2Count <= queue2Concurrent, "queue-2 count : " + queue2Count);
+
+ TimeUnit.SECONDS.sleep(sleepTime);
+ }
+
+ Assert.assertTrue(queued.isEmpty());
+ Assert.assertTrue(running.isEmpty());
+
+ for (QueryHandle q : handleList) {
+ LensQuery lq = qHelper.waitForCompletion(q);
+ Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL);
+ }
+ }
+
+ /*
+ LENS-973. Scenario is mentioned in jira
+ */
+
+ @Test(enabled = true)
+ public void queueConstraintFailureOnRestart() throws Exception {
+
+ List<QueryHandle> handleList = new ArrayList<QueryHandle>();
+
+ HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "3",
+ DriverConfig.QUEUE_MAX_CONCURRENT, queue1 + "=1," + queue2 + "=3");
+ Util.changeConfig(map, hiveDriverConf);
+ lens.restart();
+
+ String newSession = sHelper.openSession("user", "pwd", lens.getCurrentDB());
+ sHelper.setAndValidateParam(newSession, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
+ handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.SLEEP_QUERY, null, newSession).getData());
+
+ for(int i=0; i<2; i++){
+ handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY).getData());
+ }
+
+ sHelper.closeSession(newSession);
+ lens.restart();
+ Assert.assertFalse(qHelper.getQueryStatus(handleList.get(0)).finished());
+
+ for(int i=0; i<6; i++){
+ handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_DIM_QUERY).getData());
+ }
+
+ for(QueryHandle handle: handleList){
+ LensQuery lq = qHelper.waitForCompletion(handle);
+ Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java
----------------------------------------------------------------------
diff --git a/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java b/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java
deleted file mode 100644
index abf7263..0000000
--- a/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java
+++ /dev/null
@@ -1,604 +0,0 @@
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.lens.regression.throttling;
-
-import java.lang.reflect.Method;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-
-import javax.ws.rs.client.WebTarget;
-
-import org.apache.lens.api.Priority;
-import org.apache.lens.api.query.LensQuery;
-import org.apache.lens.api.query.QueryHandle;
-import org.apache.lens.api.query.QueryStatus;
-import org.apache.lens.cube.parse.CubeQueryConfUtil;
-import org.apache.lens.driver.hive.HiveDriver;
-import org.apache.lens.regression.core.constants.DriverConfig;
-import org.apache.lens.regression.core.constants.QueryInventory;
-import org.apache.lens.regression.core.helpers.ServiceManagerHelper;
-import org.apache.lens.regression.core.testHelper.BaseTestClass;
-import org.apache.lens.regression.util.Util;
-import org.apache.lens.server.api.LensConfConstants;
-import org.apache.lens.server.api.util.LensUtil;
-
-import org.apache.log4j.Logger;
-
-import org.testng.Assert;
-import org.testng.annotations.*;
-
-public class Throttling extends BaseTestClass {
-
- WebTarget servLens;
- String sessionHandleString;
-
- public static final String SLEEP_QUERY = QueryInventory.getSleepQuery("5");
- public static final String COST_95 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "5");
- public static final String COST_60 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_60"), "5");
- public static final String COST_30 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_30"), "5");
- public static final String COST_20 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_20"), "4");
- public static final String COST_10 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_10"), "4");
- public static final String COST_5 = String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_5"), "3");
- public static final String JDBC_QUERY1 = QueryInventory.getQueryFromInventory("JDBC.QUERY1");
-
- private static String hiveDriver = "hive/hive1";
- private final String hiveDriverConf = lens.getServerDir() + "/conf/drivers/hive/hive1/hivedriver-site.xml";
- private final String backupConfFilePath = lens.getServerDir() + "/conf/drivers/hive/hive1/backup-hivedriver-site.xml";
-
- private static final long SECONDS_IN_A_MINUTE = 60;
- private String session1 = null, session2 = null;
- //TODO : Read queue names from property file
- private static String queue1 = "dwh", queue2 = "reports";
-
- private static Logger logger = Logger.getLogger(Throttling.class);
-
- @BeforeClass(alwaysRun = true)
- public void initialize() throws Exception {
- servLens = ServiceManagerHelper.init();
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "10",
- HiveDriver.HS2_PRIORITY_RANGES, "HIGH,7,NORMAL,30,LOW,90,VERY_LOW");
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
- }
-
- @BeforeMethod(alwaysRun = true)
- public void setUp(Method method) throws Exception {
- logger.info("Test Name: " + method.getName());
- Util.runRemoteCommand("cp " + hiveDriverConf + " " + backupConfFilePath);
-
- sessionHandleString = sHelper.openSession(lens.getCurrentDB());
- session1 = sHelper.openSession("diff1", "diff1", lens.getCurrentDB());
- session2 = sHelper.openSession("diff2", "diff2", lens.getCurrentDB());
- sHelper.setAndValidateParam(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false");
- sHelper.setAndValidateParam(session1, CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false");
- sHelper.setAndValidateParam(session2, CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false");
- }
-
- @AfterMethod(alwaysRun = true)
- public void afterMethod(Method method) throws Exception {
- logger.info("Test Name: " + method.getName());
- qHelper.killQuery(null, "QUEUED", "all");
- qHelper.killQuery(null, "RUNNING", "all");
- qHelper.killQuery(null, "EXECUTED", "all");
- sHelper.closeSession(session1);
- sHelper.closeSession(session2);
- sHelper.closeSession(sessionHandleString);
-
- Util.runRemoteCommand("cp " + backupConfFilePath + " " + hiveDriverConf);
- }
-
- @AfterClass(alwaysRun = false)
- public void closeSession() throws Exception {
- lens.restart();
- }
-
-
- @Test(enabled = true)
- public void testHiveThrottling() throws Exception {
-
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "2");
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
-
- List<QueryHandle> handleList = new ArrayList<>();
- handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null, session1).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null, session2).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null).getData());
-
- Thread.sleep(1000);
-
- List<QueryStatus> statusList = new ArrayList<>();
- for(QueryHandle handle : handleList){
- statusList.add(qHelper.getQueryStatus(handle));
- }
-
- Assert.assertEquals(statusList.get(0).getStatus(), QueryStatus.Status.RUNNING);
- Assert.assertEquals(statusList.get(1).getStatus(), QueryStatus.Status.RUNNING);
- Assert.assertEquals(statusList.get(2).getStatus(), QueryStatus.Status.QUEUED);
- Assert.assertEquals(statusList.get(3).getStatus(), QueryStatus.Status.QUEUED);
-
- qHelper.waitForCompletion(handleList.get(0));
- Thread.sleep(100);
- Assert.assertEquals(qHelper.getQueryStatus(handleList.get(2)).getStatus(), QueryStatus.Status.RUNNING);
- }
-
-
- @Test(enabled = true)
- public void testHiveMaxConcurrentRandomQueryIngestion() throws Exception {
-
- long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
- int sleepTime = 3, maxConcurrent = 4;
- List<QueryHandle> handleList = new ArrayList<QueryHandle>();
-
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
- String.valueOf(maxConcurrent));
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
-
- for (int i = 0; i < 5; i++) {
- handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(JDBC_QUERY1).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY, null, session1).getData());
- }
-
- Thread.sleep(50);
-
- List<QueryHandle> running = null, queued = null;
- for (int t = 0; t < timeToWait; t = t + sleepTime) {
-
- running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver);
- queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver);
- logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size());
-
- if (running.isEmpty() && queued.isEmpty()) {
- break;
- }
- Assert.assertTrue(running.size() <= maxConcurrent);
-
- if (t % 30 == 0 && t < 200) {
- handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_DIM_QUERY).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.JDBC_CUBE_QUERY).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY, null, session1).getData());
- }
- TimeUnit.SECONDS.sleep(sleepTime);
- }
-
- Assert.assertTrue(running.isEmpty());
- Assert.assertTrue(queued.isEmpty());
-
- for(QueryHandle q : handleList){
- LensQuery lq = qHelper.waitForCompletion(q);
- Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL);
- }
- }
-
- @Test(enabled = true)
- public void testProrityMaxConcurrent() throws Exception {
-
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5",
- DriverConfig.PRIORITY_MAX_CONCURRENT, "HIGH=2,VERY_LOW=1", HiveDriver.HS2_PRIORITY_RANGES,
- "HIGH,7,NORMAL,30,LOW,90,VERY_LOW");
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
-
- /* First 3 are high priority queries, 2 of them will go to RUNNING, 3rd will be queued as there is a
- threshold of 2 on HIGH priority. cost_95 queries are very_low priority ones, hence 1st will go to RUNNING,
- other 1 is queued. cost_20 and cost_60 goes to running as they are normal and low priority queries and there
- is no limit set for low and normal priority.Last cost_60 query goes to queue as, RUNNING query on this
- driver has reached max concurrent threshold.
- */
-
- String[] queries = {COST_5, COST_5, COST_5, COST_95, COST_95, COST_20, COST_60, COST_60};
- QueryStatus.Status[] expectedStatus = {QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING,
- QueryStatus.Status.QUEUED, QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED,
- QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, };
-
- List<QueryHandle> handleList = new ArrayList<>();
- for (String query : queries){
- handleList.add((QueryHandle) qHelper.executeQuery(query).getData());
- }
-
- List<QueryStatus.Status> statusList = new ArrayList<>();
- for (QueryHandle handle : handleList){
- statusList.add(qHelper.getQueryStatus(handle).getStatus());
- }
-
- for (int i=0; i<statusList.size(); i++){
- Assert.assertEquals(statusList.get(i), expectedStatus[i]);
- }
-
- qHelper.waitForCompletion(handleList.get(0));
- Assert.assertEquals(qHelper.getQueryStatus(handleList.get(2)).getStatus(), QueryStatus.Status.RUNNING);
- }
-
- @Test(enabled = true)
- public void prioritySumMoreThanMaxConcurrent() throws Exception {
-
- long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
- int sleepTime = 5, maxConcurrent = 5, lowConCurrent = 2, veryLowConcurrent = 1, highConcurrent = 4,
- normalConcurrent = 2;
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
- String.valueOf(maxConcurrent), DriverConfig.PRIORITY_MAX_CONCURRENT,
- "LOW=" + String.valueOf(lowConCurrent) + ",VERY_LOW=" + String.valueOf(veryLowConcurrent)
- + ",NORMAL=" + String.valueOf(normalConcurrent) + ",HIGH=" + String.valueOf(highConcurrent));
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
-
- List<QueryHandle> handleList = new ArrayList<QueryHandle>();
- for (int i=1; i<=5; i++){
- handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_60, null, session1).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session2).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_5).getData());
- }
-
- List<QueryHandle> running=null, queued=null;
- for (int t = 0; t < timeToWait; t = t + sleepTime) {
-
- running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver);
- queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver);
- logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size());
-
- if (running.isEmpty() && queued.isEmpty()) {
- break;
- }
-
- Assert.assertTrue(running.size() <= maxConcurrent);
-
- int low = 0, veryLow = 0, high = 0, normal = 0;
- for (QueryHandle qh : running) {
- Priority p = qHelper.getLensQuery(sessionHandleString, qh).getPriority();
- Assert.assertNotNull(p);
- switch (p) {
- case HIGH:
- high++;
- break;
- case NORMAL:
- normal++;
- break;
- case LOW:
- low++;
- break;
- case VERY_LOW:
- veryLow++;
- break;
- default:
- throw new Exception("Unexpected Priority");
- }
- }
-
- Assert.assertTrue(low <= lowConCurrent);
- Assert.assertTrue(veryLow <= veryLowConcurrent);
- Assert.assertTrue(high <= highConcurrent);
- Assert.assertTrue(normal <= normalConcurrent);
-
- TimeUnit.SECONDS.sleep(sleepTime);
- }
-
- Assert.assertTrue(queued.isEmpty());
- Assert.assertTrue(running.isEmpty());
-
- for (QueryHandle q: handleList){
- LensQuery lq = qHelper.waitForCompletion(q);
- Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL);
- }
- }
-
-
- @Test(enabled = true)
- public void queueMaxConcurrent() throws Exception {
-
- int maxConcurrent = 3, queue1Count = 1, queue2Count = 2;
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
- String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT,
- queue1 + "=" + String.valueOf(queue1Count) + "," + queue2 + "=" + String.valueOf(queue2Count));
- List<QueryHandle> handleList = new ArrayList<>();
-
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
-
- sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
- handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
-
- sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
- handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
-
- List<QueryStatus.Status> statusList = new ArrayList<>();
- for (QueryHandle handle : handleList){
- statusList.add(qHelper.getQueryStatus(handle).getStatus());
- }
-
- Assert.assertEquals(statusList.get(0), QueryStatus.Status.RUNNING);
- Assert.assertEquals(statusList.get(1), QueryStatus.Status.QUEUED);
- Assert.assertEquals(statusList.get(2), QueryStatus.Status.RUNNING);
- Assert.assertEquals(statusList.get(3), QueryStatus.Status.RUNNING);
- Assert.assertEquals(statusList.get(4), QueryStatus.Status.QUEUED);
-
- qHelper.waitForCompletion(handleList.get(0));
- Assert.assertEquals(qHelper.getQueryStatus(handleList.get(1)).getStatus(), QueryStatus.Status.RUNNING);
-
- qHelper.waitForCompletion(handleList.get(2));
- Assert.assertEquals(qHelper.getQueryStatus(handleList.get(4)).getStatus(), QueryStatus.Status.RUNNING);
- }
-
- // LENS-1027
- @Test(enabled = true)
- public void queueDefaultThresholdConstraint() throws Exception {
-
- int maxConcurrent = 5, queue1Count = 1, queue2Count = 2;
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
- String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT,
- "*=" + String.valueOf(queue1Count) + "," + queue2 + "=" + String.valueOf(queue2Count));
- List<QueryHandle> handleList = new ArrayList<>();
-
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
-
- sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
- handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
-
- sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
- handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
-
- Thread.sleep(2000);
-
- List<QueryStatus.Status> statusList = new ArrayList<>();
- for (QueryHandle handle : handleList){
- statusList.add(qHelper.getQueryStatus(handle).getStatus());
- }
-
- Assert.assertEquals(statusList.get(0), QueryStatus.Status.RUNNING);
- Assert.assertEquals(statusList.get(1), QueryStatus.Status.QUEUED);
- Assert.assertEquals(statusList.get(2), QueryStatus.Status.RUNNING);
- Assert.assertEquals(statusList.get(3), QueryStatus.Status.RUNNING);
- Assert.assertEquals(statusList.get(4), QueryStatus.Status.QUEUED);
- }
-
-
- @Test(enabled = true)
- public void enableQueueThrottlingWithExistingQueuedQueries() throws Exception {
-
- long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
- int sleepTime = 5, maxConcurrent = 4, queue1Concurrent = 1, queue2Concurrent = 2;
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
- String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT,
- queue1 + "=" + String.valueOf(queue1Concurrent) + "," + queue2 + "=" + String.valueOf(queue2Concurrent));
-
- sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
- sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
-
- List<QueryHandle> handleList = new ArrayList<QueryHandle>();
- for (int i = 1; i <= 3; i++) {
- handleList.add((QueryHandle)qHelper.executeQuery(COST_95).getData());
- handleList.add((QueryHandle)qHelper.executeQuery(COST_20, "", session1).getData());
- handleList.add((QueryHandle)qHelper.executeQuery(COST_95, "", session2).getData());
- }
-
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
-
- for (int i = 1; i <= 2; i++) {
- handleList.add((QueryHandle)qHelper.executeQuery(COST_95).getData());
- handleList.add((QueryHandle)qHelper.executeQuery(COST_20, "", session1).getData());
- handleList.add((QueryHandle)qHelper.executeQuery(COST_95, "", session2).getData());
- }
-
- List<QueryHandle> running = null, queued = null;
- for (int t = 0; t < timeToWait; t = t + sleepTime) {
-
- running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver);
- queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver);
- logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size());
-
- if (running.isEmpty() && queued.isEmpty()) {
- break;
- }
- Assert.assertTrue(running.size() <= maxConcurrent);
-
- int queue1Count = 0, queue2Count = 0;
- for (QueryHandle qh : running) {
- String queue = qHelper.getLensQuery(sessionHandleString, qh).getQueryConf().getProperties()
- .get("mapreduce.job.queuename");
- Assert.assertNotNull(queue);
-
- if (queue.equals(queue1)) {
- queue1Count++;
- } else if (queue.equals(queue2)) {
- queue2Count++;
- }
- }
-
- Assert.assertTrue(queue1Count <= queue1Concurrent, "queue1 count : " + queue1Count);
- Assert.assertTrue(queue2Count <= queue2Concurrent, "queue2 count : " + queue2Count);
- TimeUnit.SECONDS.sleep(sleepTime);
- }
-
- Assert.assertTrue(running.isEmpty());
- Assert.assertTrue(queued.isEmpty());
-
- for(QueryHandle q: handleList){
- LensQuery lq = qHelper.waitForCompletion(q);
- Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL);
- }
- }
-
-
- @Test(enabled = true)
- public void queueAndPriorityMaxConcurrent() throws Exception {
-
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5",
- DriverConfig.PRIORITY_MAX_CONCURRENT, "LOW=2,VERY_LOW=1",
- DriverConfig.QUEUE_MAX_CONCURRENT, queue1 + "=1," + queue2 + "=2");
-
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
-
- sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
- sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
-
- QueryStatus.Status[] expectedStatus = {QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED,
- QueryStatus.Status.QUEUED, QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING,
- QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, QueryStatus.Status.QUEUED, };
-
- List<QueryHandle> handleList = new ArrayList<>();
- handleList.add((QueryHandle) qHelper.executeQuery(COST_95, null, session1).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session1).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_95, null, session2).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_60, null, session2).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_5, null, session2).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, session2).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_5, null, session2).getData());
-
- List<QueryStatus> statusList = new ArrayList<>();
- for(QueryHandle handle: handleList){
- statusList.add(qHelper.getQueryStatus(handle));
- }
-
- for(int i=0; i<expectedStatus.length; i++){
- Assert.assertEquals(statusList.get(i).getStatus(), expectedStatus[i], "failed : query-" + i);
- }
- }
-
-
- @Test(enabled = true)
- public void queueAndPriorityMaxConcurrentMany() throws Exception {
-
- long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
- int sleepTime = 5, maxConcurrent = 5, queue1Concurrent = 1, queue2Concurrent = 2, priority1 = 2, priority2 = 1;
-
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
- String.valueOf(maxConcurrent), DriverConfig.PRIORITY_MAX_CONCURRENT,
- "HIGH=" + String.valueOf(priority1) + ",NORMAL=" + String.valueOf(priority2),
- DriverConfig.QUEUE_MAX_CONCURRENT,
- queue1 + "=" + String.valueOf(queue1Concurrent) + "," + queue2 + "=" + String.valueOf(queue2Concurrent));
-
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
-
- sHelper.setAndValidateParam(session1, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
- sHelper.setAndValidateParam(session2, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
-
- List<QueryHandle> handleList = new ArrayList<QueryHandle>();
- for (int i = 1; i <= 3; i++) {
- handleList.add((QueryHandle) qHelper.executeQuery(COST_5).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_20, "", session1).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_60, "", session2).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_20).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_95, "", session1).getData());
- handleList.add((QueryHandle) qHelper.executeQuery(COST_5, "", session2).getData());
- }
-
- List<QueryHandle> running = null, queued = null;
- for (int t = 0; t < timeToWait; t = t + sleepTime) {
-
- running = qHelper.getQueryHandleList(null, "RUNNING", "all", sessionHandleString, null, null, hiveDriver);
- queued = qHelper.getQueryHandleList(null, "QUEUED", "all", sessionHandleString, null, null, hiveDriver);
- logger.info("Running query count : " + running.size() + "\t Queued query count : " + queued.size());
-
- if (running.isEmpty() && queued.isEmpty()) {
- break;
- }
-
- Assert.assertTrue(running.size() <= maxConcurrent);
-
- int pCount1 = 0, pCount2 = 0, queue1Count = 0, queue2Count = 0;
- for (QueryHandle qh : running) {
- Priority priority = qHelper.getLensQuery(sessionHandleString, qh).getPriority();
- String queue = qHelper.getLensQuery(sessionHandleString, qh).getQueryConf().getProperties()
- .get("mapreduce.job.queuename");
-
- Assert.assertNotNull(priority);
- Assert.assertNotNull(queue);
-
- if (priority.equals(Priority.LOW)){
- pCount1++;
- } else if (priority.equals(Priority.VERY_LOW)){
- pCount2++;
- }
-
- if (queue.equals(queue1)){
- queue1Count++;
- } else if (queue.equals(queue2)) {
- queue2Count++;
- }
- }
-
- Assert.assertTrue(pCount1 <= priority1, "proirty-1 count : " + pCount1);
- Assert.assertTrue(pCount2 <= priority2, "priority-2 count : " + pCount2);
- Assert.assertTrue(queue1Count <= queue1Concurrent, "queue-1 count : " + queue1Count);
- Assert.assertTrue(queue2Count <= queue2Concurrent, "queue-2 count : " + queue2Count);
-
- TimeUnit.SECONDS.sleep(sleepTime);
- }
-
- Assert.assertTrue(queued.isEmpty());
- Assert.assertTrue(running.isEmpty());
-
- for (QueryHandle q : handleList) {
- LensQuery lq = qHelper.waitForCompletion(q);
- Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL);
- }
- }
-
- /*
- LENS-973. Scenario is mentioned in jira
- */
-
- @Test(enabled = true)
- public void queueConstraintFailureOnRestart() throws Exception {
-
- List<QueryHandle> handleList = new ArrayList<QueryHandle>();
-
- HashMap<String, String> map = LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "3",
- DriverConfig.QUEUE_MAX_CONCURRENT, queue1 + "=1," + queue2 + "=3");
- Util.changeConfig(map, hiveDriverConf);
- lens.restart();
-
- String newSession = sHelper.openSession("user", "pwd", lens.getCurrentDB());
- sHelper.setAndValidateParam(newSession, LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
- handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.SLEEP_QUERY, null, newSession).getData());
-
- for(int i=0; i<2; i++){
- handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY).getData());
- }
-
- sHelper.closeSession(newSession);
- lens.restart();
- Assert.assertFalse(qHelper.getQueryStatus(handleList.get(0)).finished());
-
- for(int i=0; i<6; i++){
- handleList.add((QueryHandle) qHelper.executeQuery(QueryInventory.HIVE_DIM_QUERY).getData());
- }
-
- for(QueryHandle handle: handleList){
- LensQuery lq = qHelper.waitForCompletion(handle);
- Assert.assertEquals(lq.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
index 8b10d1d..24660e1 100644
--- a/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/metastore/CubeMetastoreServiceImpl.java
@@ -238,7 +238,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
JAXBUtils.dumpPeriodsFromStorageTables(xDimTable.getStorageTables());
Map<String, String> properties = JAXBUtils.mapFromXProperties(xDimTable.getProperties());
- Map<String, StorageTableDesc> storageDesc = JAXBUtils.storageTableMapFromXStorageTables(
+ Map<String, StorageTableDesc> storageDesc = JAXBUtils.tableDescPrefixMapFromXStorageTables(
xDimTable.getStorageTables());
try (SessionContext ignored = new SessionContext(sessionid)){
@@ -289,7 +289,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
try (SessionContext ignored = new SessionContext(sessionid)){
getClient(sessionid).alterCubeDimensionTable(dimensionTable.getTableName(),
JAXBUtils.cubeDimTableFromDimTable(dimensionTable),
- JAXBUtils.storageTableMapFromXStorageTables(dimensionTable.getStorageTables()));
+ JAXBUtils.tableDescPrefixMapFromXStorageTables(dimensionTable.getStorageTables()));
log.info("Updated dimension table " + dimensionTable.getTableName());
} catch (HiveException exc) {
throw new LensException(exc);
@@ -398,15 +398,38 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
CubeMetastoreClient msClient = getClient(sessionid);
CubeFactTable cft = msClient.getFactTable(fact);
XFactTable factTable = JAXBUtils.factTableFromCubeFactTable(cft);
+ Map<String, Map<UpdatePeriod, String>> storageMap = cft.getStoragePrefixUpdatePeriodMap();
for (String storageName : cft.getStorages()) {
Set<UpdatePeriod> updatePeriods = cft.getUpdatePeriods().get(storageName);
- XStorageTableElement tblElement = JAXBUtils.getXStorageTableFromHiveTable(
- msClient.getHiveTable(MetastoreUtil.getFactOrDimtableStorageTableName(fact, storageName)));
- tblElement.setStorageName(storageName);
- for (UpdatePeriod p : updatePeriods) {
- tblElement.getUpdatePeriods().getUpdatePeriod().add(XUpdatePeriod.valueOf(p.name()));
+ // This map tells if there are different tables for different update period.
+ Map<UpdatePeriod, String> updatePeriodToTableMap = storageMap.get(storageName);
+ Set<String> tableNames = new HashSet<>();
+ for (UpdatePeriod updatePeriod : updatePeriods) {
+ tableNames.add(updatePeriodToTableMap.get(updatePeriod));
+ }
+ if (tableNames.size() <= 1) {
+ XStorageTableElement tblElement = JAXBUtils.getXStorageTableFromHiveTable(
+ msClient.getHiveTable(MetastoreUtil.getFactOrDimtableStorageTableName(fact, storageName)));
+ tblElement.setStorageName(storageName);
+ for (UpdatePeriod p : updatePeriods) {
+ tblElement.getUpdatePeriods().getUpdatePeriod().add(XUpdatePeriod.valueOf(p.name()));
+ }
+ factTable.getStorageTables().getStorageTable().add(tblElement);
+ } else {
+ // Multiple storage tables.
+ XStorageTableElement tblElement = new XStorageTableElement();
+ tblElement.setStorageName(storageName);
+ XUpdatePeriods xUpdatePeriods = new XUpdatePeriods();
+ tblElement.setUpdatePeriods(xUpdatePeriods);
+ for (Map.Entry entry : updatePeriodToTableMap.entrySet()) {
+ XUpdatePeriodTableDescriptor updatePeriodTableDescriptor = new XUpdatePeriodTableDescriptor();
+ updatePeriodTableDescriptor.setTableDesc(getStorageTableDescFromHiveTable(
+ msClient.getHiveTable(MetastoreUtil.getFactOrDimtableStorageTableName(fact, (String) entry.getValue()))));
+ updatePeriodTableDescriptor.setUpdatePeriod(XUpdatePeriod.valueOf(((UpdatePeriod)entry.getKey()).name()));
+ xUpdatePeriods.getUpdatePeriodTableDescriptor().add(updatePeriodTableDescriptor);
+ }
+ factTable.getStorageTables().getStorageTable().add(tblElement);
}
- factTable.getStorageTables().getStorageTable().add(tblElement);
}
return factTable;
}
@@ -431,7 +454,8 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
JAXBUtils.getFactUpdatePeriodsFromStorageTables(fact.getStorageTables()),
fact.getWeight(),
addFactColStartTimePropertyToFactProperties(fact),
- JAXBUtils.storageTableMapFromXStorageTables(fact.getStorageTables()));
+ JAXBUtils.tableDescPrefixMapFromXStorageTables(fact.getStorageTables()),
+ JAXBUtils.storageTablePrefixMapOfStorage(fact.getStorageTables()));
log.info("Created fact table " + fact.getName());
}
}
@@ -460,7 +484,7 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
public void updateFactTable(LensSessionHandle sessionid, XFactTable fact) throws LensException {
try (SessionContext ignored = new SessionContext(sessionid)){
getClient(sessionid).alterCubeFactTable(fact.getName(), JAXBUtils.cubeFactFromFactTable(fact),
- JAXBUtils.storageTableMapFromXStorageTables(fact.getStorageTables()),
+ JAXBUtils.tableDescPrefixMapFromXStorageTables(fact.getStorageTables()),
JAXBUtils.columnStartAndEndTimeFromXColumns(fact.getColumns()));
log.info("Updated fact table " + fact.getName());
} catch (HiveException e) {
@@ -587,11 +611,13 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
for (XUpdatePeriod sup : storageTable.getUpdatePeriods().getUpdatePeriod()) {
updatePeriods.add(UpdatePeriod.valueOf(sup.name()));
}
- try (SessionContext ignored = new SessionContext(sessionid)){
+ try (SessionContext ignored = new SessionContext(sessionid)) {
CubeMetastoreClient msClient = getClient(sessionid);
- msClient.addStorage(msClient.getFactTable(fact),
- storageTable.getStorageName(), updatePeriods,
- JAXBUtils.storageTableDescFromXStorageTableElement(storageTable));
+ XStorageTables tables = new XStorageTables();
+ tables.getStorageTable().add(storageTable);
+ msClient.addStorage(msClient.getFactTable(fact), storageTable.getStorageName(), updatePeriods,
+ JAXBUtils.tableDescPrefixMapFromXStorageTables(tables),
+ JAXBUtils.storageTablePrefixMapOfStorage(tables).get(storageTable.getStorageName()));
log.info("Added storage " + storageTable.getStorageName() + ":" + updatePeriods + " for fact " + fact);
}
}
@@ -615,17 +641,34 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
return factTable;
}
+ private Set<String> getAllTablesForStorage(LensSessionHandle sessionHandle, String fact, String storageName)
+ throws LensException {
+ Set<String> storageTableNames = new HashSet<>();
+ if (getClient(sessionHandle).isFactTable(fact)) {
+ CubeFactTable cft = getClient(sessionHandle).getCubeFact(fact);
+ Map<UpdatePeriod, String> storageMap = cft.getStoragePrefixUpdatePeriodMap().get(storageName);
+ for (Map.Entry entry : storageMap.entrySet()) {
+ storageTableNames.add(MetastoreUtil.getStorageTableName(fact, Storage.getPrefix((String) entry.getValue())));
+ }
+ } else {
+ storageTableNames.add(MetastoreUtil.getFactOrDimtableStorageTableName(fact, storageName));
+ }
+ return storageTableNames;
+ }
+
@Override
- public XPartitionList getAllPartitionsOfFactStorage(
- LensSessionHandle sessionid, String fact, String storageName,
+ public XPartitionList getAllPartitionsOfFactStorage(LensSessionHandle sessionid, String fact, String storageName,
String filter) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
+ try (SessionContext ignored = new SessionContext(sessionid)) {
checkFactStorage(sessionid, fact, storageName);
CubeMetastoreClient client = getClient(sessionid);
- String storageTableName = MetastoreUtil.getFactOrDimtableStorageTableName(fact,
- storageName);
- List<Partition> parts = client.getPartitionsByFilter(storageTableName, filter);
- List<String> timePartCols = client.getTimePartColNamesOfTable(storageTableName);
+ Set<String> storageTableNames = getAllTablesForStorage(sessionid, fact, storageName);
+ List<Partition> parts = new ArrayList<>();
+ List<String> timePartCols = new ArrayList<>();
+ for (String storageTableName : storageTableNames) {
+ parts.addAll(client.getPartitionsByFilter(storageTableName, filter));
+ timePartCols.addAll(client.getTimePartColNamesOfTable(storageTableName));
+ }
return xpartitionListFromPartitionList(fact, parts, timePartCols);
} catch (HiveException exc) {
throw new LensException(exc);
@@ -635,10 +678,10 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
@Override
public int addPartitionToFactStorage(LensSessionHandle sessionid, String fact, String storageName,
XPartition partition) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
+ try (SessionContext ignored = new SessionContext(sessionid)) {
checkFactStorage(sessionid, fact, storageName);
- return getClient(sessionid).addPartition(storagePartSpecFromXPartition(partition), storageName,
- CubeTableType.FACT).size();
+ return getClient(sessionid)
+ .addPartition(storagePartSpecFromXPartition(partition), storageName, CubeTableType.FACT).size();
} catch (HiveException exc) {
throw new LensException(exc);
}
@@ -647,10 +690,10 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
@Override
public int addPartitionsToFactStorage(LensSessionHandle sessionid, String fact, String storageName,
XPartitionList partitions) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
+ try (SessionContext ignored = new SessionContext(sessionid)) {
checkFactStorage(sessionid, fact, storageName);
- return getClient(sessionid).addPartitions(storagePartSpecListFromXPartitionList(partitions), storageName,
- CubeTableType.FACT).size();
+ return getClient(sessionid)
+ .addPartitions(storagePartSpecListFromXPartitionList(partitions), storageName, CubeTableType.FACT).size();
} catch (HiveException exc) {
throw new LensException(exc);
}
@@ -693,15 +736,17 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
}
@Override
- public void updatePartition(LensSessionHandle sessionid, String tblName, String storageName,
- XPartition xPartition) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
+ public void updatePartition(LensSessionHandle sessionid, String tblName, String storageName, XPartition xPartition)
+ throws LensException {
+ try (SessionContext ignored = new SessionContext(sessionid)) {
CubeMetastoreClient client = getClient(sessionid);
- String storageTableName = MetastoreUtil.getFactOrDimtableStorageTableName(tblName, storageName);
+ String storageTableName = client
+ .getStorageTableName(tblName, storageName, UpdatePeriod.valueOf(xPartition.getUpdatePeriod().name()));
Partition existingPartition = client.getPartitionByFilter(storageTableName,
StorageConstants.getPartFilter(JAXBUtils.getFullPartSpecAsMap(xPartition)));
JAXBUtils.updatePartitionFromXPartition(existingPartition, xPartition);
- client.updatePartition(tblName, storageName, existingPartition);
+ client.updatePartition(tblName, storageName, existingPartition,
+ UpdatePeriod.valueOf(xPartition.getUpdatePeriod().value()));
} catch (HiveException | ClassNotFoundException | InvalidOperationException | UnsupportedOperationException exc) {
throw new LensException(exc);
}
@@ -710,15 +755,23 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
@Override
public void updatePartitions(LensSessionHandle sessionid, String tblName, String storageName,
XPartitionList xPartitions) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
+ try (SessionContext ignored = new SessionContext(sessionid)) {
CubeMetastoreClient client = getClient(sessionid);
- String storageTableName = MetastoreUtil.getFactOrDimtableStorageTableName(tblName, storageName);
- List<Partition> partitionsToUpdate = new ArrayList<>(xPartitions.getPartition().size());
- for (XPartition xPartition : xPartitions.getPartition()) {
- Partition existingPartition = client.getPartitionByFilter(storageTableName,
- StorageConstants.getPartFilter(JAXBUtils.getFullPartSpecAsMap(xPartition)));
- JAXBUtils.updatePartitionFromXPartition(existingPartition, xPartition);
- partitionsToUpdate.add(existingPartition);
+ Set<String> storageTableNames = getAllTablesForStorage(sessionid, tblName, storageName);
+ Map<UpdatePeriod, List<Partition>> partitionsToUpdate = new HashMap<>();
+ for (String storageTableName : storageTableNames) {
+ for (XPartition xPartition : xPartitions.getPartition()) {
+ Partition existingPartition = client.getPartitionByFilter(storageTableName,
+ StorageConstants.getPartFilter(JAXBUtils.getFullPartSpecAsMap(xPartition)));
+ JAXBUtils.updatePartitionFromXPartition(existingPartition, xPartition);
+ UpdatePeriod updatePeriod = UpdatePeriod.valueOf(xPartition.getUpdatePeriod().value());
+ List<Partition> partitionList = partitionsToUpdate.get(updatePeriod);
+ if (partitionList == null) {
+ partitionList = new ArrayList<>();
+ partitionsToUpdate.put(updatePeriod, partitionList);
+ }
+ partitionList.add(existingPartition);
+ }
}
client.updatePartitions(tblName, storageName, partitionsToUpdate);
} catch (HiveException | ClassNotFoundException | InvalidOperationException exc) {
@@ -787,29 +840,35 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
return period;
}
- public void dropPartitionFromStorageByValues(LensSessionHandle sessionid,
- String cubeTableName, String storageName, String values) throws LensException {
- try (SessionContext ignored = new SessionContext(sessionid)){
- String tableName = MetastoreUtil.getStorageTableName(cubeTableName,
- Storage.getPrefix(storageName));
+ public void dropPartitionFromStorageByValues(LensSessionHandle sessionid, String cubeTableName, String storageName,
+ String values) throws LensException {
+ try (SessionContext ignored = new SessionContext(sessionid)) {
+ Set<String> storageTables = getAllTablesForStorage(sessionid, cubeTableName, storageName);
+ Map<String, List<Partition>> partitions = new HashMap<>();
CubeMetastoreClient msClient = getClient(sessionid);
- String filter = getFilter(msClient, tableName, values);
- List<Partition> partitions = msClient.getPartitionsByFilter(
- tableName, filter);
- if (partitions.size() > 1) {
- log.error("More than one partition with specified values, correspoding filter:" + filter);
- throw new BadRequestException("More than one partition with specified values");
- } else if (partitions.size() == 0) {
- log.error("No partition exists with specified values, correspoding filter:" + filter);
+ int totalPartitions = 0;
+ Partition part = null;
+ for (String tableName : storageTables) {
+ String filter = getFilter(msClient, tableName, values);
+ partitions.put(filter, msClient.getPartitionsByFilter(tableName, filter));
+ if (partitions.get(filter).size() > 1) {
+ log.error("More than one partition with specified values, corresponding filter:" + filter);
+ throw new BadRequestException("More than one partition with specified values");
+ }
+ if (partitions.get(filter).size() == 1) {
+ part = partitions.get(filter).get(0);
+ }
+ totalPartitions += partitions.get(filter).size();
+ }
+ if (totalPartitions == 0) {
+ log.error("No partition exists with specified values");
throw new NotFoundException("No partition exists with specified values");
}
Map<String, Date> timeSpec = new HashMap<>();
Map<String, String> nonTimeSpec = new HashMap<>();
- UpdatePeriod updatePeriod = populatePartSpec(partitions.get(0), timeSpec, nonTimeSpec);
- msClient.dropPartition(cubeTableName,
- storageName, timeSpec, nonTimeSpec, updatePeriod);
- log.info("Dropped partition for dimension: " + cubeTableName
- + " storage: " + storageName + " values:" + values);
+ UpdatePeriod updatePeriod = populatePartSpec(part, timeSpec, nonTimeSpec);
+ msClient.dropPartition(cubeTableName, storageName, timeSpec, nonTimeSpec, updatePeriod);
+ log.info("Dropped partition for dimension: " + cubeTableName + " storage: " + storageName + " values:" + values);
} catch (HiveException exc) {
throw new LensException(exc);
}
@@ -818,9 +877,12 @@ public class CubeMetastoreServiceImpl extends BaseLensService implements CubeMet
public void dropPartitionFromStorageByFilter(LensSessionHandle sessionid, String cubeTableName,
String storageName, String filter) throws LensException {
try (SessionContext ignored = new SessionContext(sessionid)){
- String tableName = MetastoreUtil.getStorageTableName(cubeTableName, Storage.getPrefix(storageName));
+ Set<String> storageTables = getAllTablesForStorage(sessionid, cubeTableName, storageName);
+ List<Partition> partitions = new ArrayList<>();
CubeMetastoreClient msClient = getClient(sessionid);
- List<Partition> partitions = msClient.getPartitionsByFilter(tableName, filter);
+ for (String tableName : storageTables) {
+ partitions.addAll(msClient.getPartitionsByFilter(tableName, filter));
+ }
for (Partition part : partitions) {
try {
Map<String, Date> timeSpec = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java b/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java
index 51fcb43..7d54c7b 100644
--- a/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java
+++ b/lens-server/src/main/java/org/apache/lens/server/metastore/JAXBUtils.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.mapred.InputFormat;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
-
import lombok.extern.slf4j.Slf4j;
/**
@@ -588,14 +587,22 @@ public final class JAXBUtils {
return cols;
}
- public static Map<String, Set<UpdatePeriod>> getFactUpdatePeriodsFromStorageTables(
- XStorageTables storageTables) {
+ public static Map<String, Set<UpdatePeriod>> getFactUpdatePeriodsFromStorageTables(XStorageTables storageTables) {
if (storageTables != null && !storageTables.getStorageTable().isEmpty()) {
Map<String, Set<UpdatePeriod>> factUpdatePeriods = new LinkedHashMap<String, Set<UpdatePeriod>>();
for (XStorageTableElement ste : storageTables.getStorageTable()) {
- Set<UpdatePeriod> updatePeriods = new TreeSet<UpdatePeriod>();
- for (XUpdatePeriod upd : ste.getUpdatePeriods().getUpdatePeriod()) {
+ Set<UpdatePeriod> updatePeriods = new TreeSet<>();
+ // Check if the update period array is empty.
+ List<XUpdatePeriod> updatePeriodList = ste.getUpdatePeriods().getUpdatePeriod();
+ if (updatePeriodList.isEmpty()) {
+ List<XUpdatePeriodTableDescriptor> tableDescriptorList = ste.getUpdatePeriods()
+ .getUpdatePeriodTableDescriptor();
+ for (XUpdatePeriodTableDescriptor tableDescriptor : tableDescriptorList) {
+ updatePeriodList.add(tableDescriptor.getUpdatePeriod());
+ }
+ }
+ for (XUpdatePeriod upd : updatePeriodList) {
updatePeriods.add(UpdatePeriod.valueOf(upd.name()));
}
factUpdatePeriods.put(ste.getStorageName(), updatePeriods);
@@ -706,13 +713,10 @@ public final class JAXBUtils {
Map<String, Set<UpdatePeriod>> storageUpdatePeriods = getFactUpdatePeriodsFromStorageTables(
fact.getStorageTables());
-
- return new CubeFactTable(fact.getCubeName(),
- fact.getName(),
- columns,
- storageUpdatePeriods,
- fact.getWeight(),
- mapFromXProperties(fact.getProperties()));
+ Map<String, Map<UpdatePeriod, String>> storageTablePrefixMap = storageTablePrefixMapOfStorage(
+ fact.getStorageTables());
+ return new CubeFactTable(fact.getCubeName(), fact.getName(), columns, storageUpdatePeriods, fact.getWeight(),
+ mapFromXProperties(fact.getProperties()), storageTablePrefixMap);
}
public static Segmentation segmentationFromXSegmentation(XSegmentation seg) throws LensException {
@@ -849,11 +853,45 @@ public final class JAXBUtils {
return tblDesc;
}
- public static Map<String, StorageTableDesc> storageTableMapFromXStorageTables(XStorageTables storageTables) {
- Map<String, StorageTableDesc> storageTableMap = new HashMap<String, StorageTableDesc>();
+ public static Map<String, StorageTableDesc> tableDescPrefixMapFromXStorageTables(XStorageTables storageTables) {
+ Map<String, StorageTableDesc> storageTablePrefixToDescMap = new HashMap<>();
+ if (storageTables != null && !storageTables.getStorageTable().isEmpty()) {
+ for (XStorageTableElement sTbl : storageTables.getStorageTable()) {
+ if (sTbl.getUpdatePeriods() != null && sTbl.getUpdatePeriods().getUpdatePeriodTableDescriptor() != null && !sTbl
+ .getUpdatePeriods().getUpdatePeriodTableDescriptor().isEmpty()) {
+ for (XUpdatePeriodTableDescriptor updatePeriodTable : sTbl.getUpdatePeriods()
+ .getUpdatePeriodTableDescriptor()) {
+ // Get table name with update period as the prefix.
+ storageTablePrefixToDescMap.put(updatePeriodTable.getUpdatePeriod() + "_" + sTbl.getStorageName(),
+ storageTableDescFromXStorageTableDesc(updatePeriodTable.getTableDesc()));
+ }
+ } else {
+ storageTablePrefixToDescMap.put(sTbl.getStorageName(), storageTableDescFromXStorageTableElement(sTbl));
+ }
+ }
+ }
+ return storageTablePrefixToDescMap;
+ }
+
+ public static Map<String, Map<UpdatePeriod, String>> storageTablePrefixMapOfStorage(XStorageTables storageTables) {
+ Map<String, Map<UpdatePeriod, String>> storageTableMap = new HashMap<>();
if (storageTables != null && !storageTables.getStorageTable().isEmpty()) {
for (XStorageTableElement sTbl : storageTables.getStorageTable()) {
- storageTableMap.put(sTbl.getStorageName(), storageTableDescFromXStorageTableElement(sTbl));
+ Map<UpdatePeriod, String> storageNameMap = new HashMap<>();
+ if (sTbl.getUpdatePeriods() != null && sTbl.getUpdatePeriods().getUpdatePeriodTableDescriptor() != null && !sTbl
+ .getUpdatePeriods().getUpdatePeriodTableDescriptor().isEmpty()) {
+ for (XUpdatePeriodTableDescriptor updatePeriodTable : sTbl.getUpdatePeriods()
+ .getUpdatePeriodTableDescriptor()) {
+ // Get table name with update period as the prefix.
+ storageNameMap.put(UpdatePeriod.valueOf(updatePeriodTable.getUpdatePeriod().value()),
+ updatePeriodTable.getUpdatePeriod() + "_" + sTbl.getStorageName());
+ }
+ } else {
+ for (XUpdatePeriod updatePeriod : sTbl.getUpdatePeriods().getUpdatePeriod()) {
+ storageNameMap.put(UpdatePeriod.valueOf(updatePeriod.value()), sTbl.getStorageName());
+ }
+ }
+ storageTableMap.put(sTbl.getStorageName(), storageNameMap);
}
}
return storageTableMap;
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java b/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
index dd489e8..cc6ca7d 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java
@@ -139,8 +139,10 @@ public class LensServerDAO {
query.getDriverEndTime(), query.getDriverName(), query.getQueryName(), query.getSubmissionTime(),
query.getDriverQuery(), serializeConf(query.getConf()),
query.getFailedAttempts() == null ? 0 : query.getFailedAttempts().size());
- for (int i = 0; i < query.getFailedAttempts().size(); i++) {
- insertFailedAttempt(runner, conn, query.getHandle(), query.getFailedAttempts().get(i), i);
+ if (query.getFailedAttempts() != null) {
+ for (int i = 0; i < query.getFailedAttempts().size(); i++) {
+ insertFailedAttempt(runner, conn, query.getHandle(), query.getFailedAttempts().get(i), i);
+ }
}
conn.commit();
} finally {
http://git-wip-us.apache.org/repos/asf/lens/blob/2aaf6e0a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index c76ad24..c6fbeda 100644
--- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -404,8 +404,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE
private void loadQueryComparator() throws LensException {
try {
Class<?>[] classes = conf.getClasses(QUERY_COMPARATOR_CLASSES,
- MoreRetriesFirstComparator.class, QueryPriorityComparator.class,
- FIFOQueryComparator.class, QueryCostComparator.class);
+ MoreRetriesFirstComparator.class, QueryPriorityComparator.class, FIFOQueryComparator.class);
List<Comparator<QueryContext>> comparators = Lists.newArrayList();
for (Class<?> clazz: classes) {
comparators.add(clazz.asSubclass(QueryComparator.class).newInstance());