You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/06/20 17:51:43 UTC
[1/2] phoenix git commit: PHOENIX-2950 Implement CRON job to start
async index builds and restart failed jobs (Ravi Kishore Valeti)
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-1.1 49c7bd494 -> 56ba37386
PHOENIX-2950 Implement CRON job to start async index builds and restart failed jobs (Ravi Kishore Valeti)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/137feb62
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/137feb62
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/137feb62
Branch: refs/heads/4.x-HBase-1.1
Commit: 137feb62ceeb9ef8cd687da5b3b365e901088b3d
Parents: 49c7bd4
Author: James Taylor <ja...@apache.org>
Authored: Mon Jun 20 10:43:40 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Mon Jun 20 10:51:20 2016 -0700
----------------------------------------------------------------------
.../phoenix/mapreduce/index/IndexTool.java | 2 +-
.../index/automation/PhoenixAsyncIndex.java | 75 +++++
.../index/automation/PhoenixMRJobCallable.java | 73 +++++
.../index/automation/PhoenixMRJobSubmitter.java | 281 +++++++++++++++++++
.../index/automation/YarnApplication.java | 208 ++++++++++++++
.../apache/phoenix/util/PhoenixMRJobUtil.java | 233 +++++++++++++++
.../phoenix/util/ZKBasedMasterElectionUtil.java | 70 +++++
.../index/automated/TestMRJobSubmitter.java | 137 +++++++++
8 files changed, 1078 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/137feb62/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index db8aba9..576dbd3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -97,7 +97,7 @@ public class IndexTool extends Configured implements Tool {
private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true,
"Output path where the files are written");
private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
- private static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
+ public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
private Options getOptions() {
final Options options = new Options();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/137feb62/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
new file mode 100644
index 0000000..3e88cd0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index.automation;
+
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PTable.IndexType;
+
+public class PhoenixAsyncIndex {
+ private String tableName;
+ private IndexType indexType;
+ private String tableSchem;
+ private String dataTableName;
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public IndexType getIndexType() {
+ return indexType;
+ }
+
+ public void setIndexType(IndexType indexType) {
+ this.indexType = indexType;
+ }
+
+ public String getTableSchem() {
+ return tableSchem;
+ }
+
+ public void setTableSchem(String tableSchem) {
+ this.tableSchem = tableSchem;
+ }
+
+ public String getDataTableName() {
+ return dataTableName;
+ }
+
+ public void setDataTableName(String dataTableName) {
+ this.dataTableName = dataTableName;
+ }
+
+ public String getJobName() {
+ return String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, dataTableName, tableName);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("TableName = " + tableName)
+ .append(" ; IndexType = " + (indexType == null ? "" : indexType.toString()))
+ .append(" ; TableSchem = " + tableSchem)
+ .append(" ; DataTableName = " + dataTableName);
+ return builder.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/137feb62/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobCallable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobCallable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobCallable.java
new file mode 100644
index 0000000..720d2c7
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobCallable.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index.automation;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.SchemaUtil;
+
+public class PhoenixMRJobCallable implements Callable<Boolean> {
+
+ private PhoenixAsyncIndex indexInfo;
+ private String basePath;
+ private Configuration conf;
+
+ public PhoenixMRJobCallable(Configuration conf, final PhoenixAsyncIndex indexInfo,
+ String basePath) {
+ this.conf = conf;
+ this.indexInfo = indexInfo;
+ this.basePath = basePath;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ StringBuilder commandLineArgBuilder = new StringBuilder();
+ commandLineArgBuilder.append(" -dt " + indexInfo.getDataTableName());
+ commandLineArgBuilder.append(" -it " + indexInfo.getTableName());
+ commandLineArgBuilder.append(" -direct");
+ commandLineArgBuilder.append(" -op " + (basePath.endsWith("/") ? basePath : basePath + "/")
+ + indexInfo.getTableName());
+
+ if (indexInfo.getTableSchem() != null && indexInfo.getTableSchem().trim().length() > 0) {
+ commandLineArgBuilder.append(" -s " + indexInfo.getTableSchem());
+ }
+ // Setting the configuration here again (in addition to IndexTool.java) to doubly sure
+ // configurations are set
+ final String qDataTable =
+ SchemaUtil.getTableName(indexInfo.getTableSchem(), indexInfo.getDataTableName());
+ final String qIndexTable =
+ SchemaUtil.getTableName(indexInfo.getTableSchem(), indexInfo.getTableName());
+ String physicalIndexTable = qIndexTable;
+
+ if (IndexType.LOCAL.equals(indexInfo.getIndexType())) {
+ physicalIndexTable = MetaDataUtil.getLocalIndexTableName(qDataTable);
+ }
+ conf.set(TableOutputFormat.OUTPUT_TABLE, physicalIndexTable);
+
+ IndexTool tool = new IndexTool();
+ tool.setConf(conf);
+ int result = tool.run(commandLineArgBuilder.toString().split(" "));
+ return result == 0 ? true : false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/137feb62/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
new file mode 100644
index 0000000..9b8d5c8
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index.automation;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableType;
+import org.apache.phoenix.util.PhoenixMRJobUtil;
+import org.apache.phoenix.util.PhoenixMRJobUtil.MR_SCHEDULER_TYPE;
+import org.apache.phoenix.util.ZKBasedMasterElectionUtil;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+
+public class PhoenixMRJobSubmitter {
+
+ // Lock to elect a master node that submits the Phoenix Secondary Index MR Jobs
+ private static final String PHOENIX_LOCKS_PARENT =
+ "/phoenix/automated-mr-index-build-leader-election";
+ private static final String AUTO_INDEX_BUILD_LOCK_NAME = "ActiveStandbyElectorLock";
+
+ private static final String CANDIDATE_INDEX_INFO_QUERY = "SELECT "
+ + PhoenixDatabaseMetaData.INDEX_TYPE + ", " + PhoenixDatabaseMetaData.DATA_TABLE_NAME
+ + ", " + PhoenixDatabaseMetaData.TABLE_SCHEM + ", "
+ + PhoenixDatabaseMetaData.TABLE_NAME + " FROM "
+ + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE "
+ + PhoenixDatabaseMetaData.COLUMN_NAME + " is null and "
+ + PhoenixDatabaseMetaData.TABLE_TYPE + " = '" + PTableType.INDEX.getSerializedValue()
+ + "' and " + PhoenixDatabaseMetaData.INDEX_STATE + " = '"
+ + PIndexState.BUILDING.getSerializedValue() + "'";
+ // TODO - Move this to a property?
+ private static final int JOB_SUBMIT_POOL_TIMEOUT = 5;
+ private Configuration conf;
+ private String zkQuorum;
+ private static final Log LOG = LogFactory.getLog(PhoenixMRJobSubmitter.class);
+
+ public PhoenixMRJobSubmitter() throws IOException {
+ this(null);
+ }
+
+ public PhoenixMRJobSubmitter(Configuration conf) throws IOException {
+ if (conf == null) {
+ conf = HBaseConfiguration.create();
+ }
+ this.conf = conf;
+
+ PhoenixMRJobUtil.updateTimeoutsToFailFast(conf);
+ String schedulerType =
+ conf.get(PhoenixMRJobUtil.PHOENIX_MR_SCHEDULER_TYPE_NAME,
+ MR_SCHEDULER_TYPE.NONE.toString());
+
+ MR_SCHEDULER_TYPE type = MR_SCHEDULER_TYPE.valueOf(schedulerType);
+
+ switch (type) {
+ case CAPACITY:
+ LOG.info("Applying the Capacity Scheduler Queue Configurations");
+ PhoenixMRJobUtil.updateCapacityQueueInfo(conf);
+ break;
+ case FAIR:
+ LOG.warn("Fair Scheduler type is not yet supported");
+ throw new IOException("Fair Scheduler is not yet supported");
+ case NONE:
+ default:
+ break;
+ }
+ zkQuorum = conf.get(HConstants.ZOOKEEPER_QUORUM);
+ // Use UGI.loginUserFromKeytab to login and work with secure clusters
+ enableKeyTabSecurity();
+ }
+
+ private void enableKeyTabSecurity() throws IOException {
+
+ final String PRINCIPAL = "principal";
+ final String KEYTAB = "keyTab";
+ // Login with the credentials from the keytab to retrieve the TGT . The
+ // renewal of the TGT happens in a Zookeeper thread
+ String principal = null;
+ String keyTabPath = null;
+ AppConfigurationEntry entries[] =
+ javax.security.auth.login.Configuration.getConfiguration()
+ .getAppConfigurationEntry("Client");
+ LOG.info("Security - Fetched App Login Configuration Entries");
+ if (entries != null) {
+ for (AppConfigurationEntry entry : entries) {
+ if (entry.getOptions().get(PRINCIPAL) != null) {
+ principal = (String) entry.getOptions().get(PRINCIPAL);
+ }
+ if (entry.getOptions().get(KEYTAB) != null) {
+ keyTabPath = (String) entry.getOptions().get(KEYTAB);
+ }
+ }
+ LOG.info("Security - Got Principal = " + principal + "");
+ if (principal != null && keyTabPath != null) {
+ LOG.info("Security - Retreiving the TGT with principal:" + principal
+ + " and keytab:" + keyTabPath);
+ UserGroupInformation.loginUserFromKeytab(principal, keyTabPath);
+ LOG.info("Security - Retrieved TGT with principal:" + principal + " and keytab:"
+ + keyTabPath);
+ }
+ }
+ }
+
+ public Map<String, PhoenixAsyncIndex> getCandidateJobs() throws SQLException {
+ Connection con = DriverManager.getConnection("jdbc:phoenix:" + zkQuorum);
+ Statement s = con.createStatement();
+ ResultSet rs = s.executeQuery(CANDIDATE_INDEX_INFO_QUERY);
+ Map<String, PhoenixAsyncIndex> candidateIndexes = new HashMap<String, PhoenixAsyncIndex>();
+ while (rs.next()) {
+ PhoenixAsyncIndex indexInfo = new PhoenixAsyncIndex();
+ indexInfo.setIndexType(IndexType.fromSerializedValue(rs
+ .getByte(PhoenixDatabaseMetaData.INDEX_TYPE)));
+ indexInfo.setDataTableName(rs.getString(PhoenixDatabaseMetaData.DATA_TABLE_NAME));
+ indexInfo.setTableSchem(rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM));
+ indexInfo.setTableName(rs.getString(PhoenixDatabaseMetaData.TABLE_NAME));
+ candidateIndexes.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+ indexInfo.getDataTableName(), indexInfo.getTableName()), indexInfo);
+ }
+
+ return candidateIndexes;
+ }
+
+ public int scheduleIndexBuilds() throws Exception {
+
+ ZooKeeperWatcher zookeeperWatcher =
+ new ZooKeeperWatcher(conf, "phoenixAutomatedMRIndexBuild", null);
+
+ if (!ZKBasedMasterElectionUtil.acquireLock(zookeeperWatcher, PHOENIX_LOCKS_PARENT,
+ AUTO_INDEX_BUILD_LOCK_NAME)) {
+ LOG.info("Some other node is already running Automated Index Build. Skipping execution!");
+ return -1;
+ }
+ // 1) Query Phoenix SYSTEM.CATALOG table to get a list of all candidate indexes to be built
+ // (in state 'b')
+ // 2) Get a list of all ACCEPTED, SUBMITTED AND RUNNING jobs from Yarn Resource Manager
+ // 3) Get the jobs to submit (list from 1 - list from 2)
+
+ // Get Candidate indexes to be built
+ Map<String, PhoenixAsyncIndex> candidateJobs = getCandidateJobs();
+ LOG.info("Candidate Indexes to be built as seen from SYSTEM.CATALOG - " + candidateJobs);
+
+ // Get already scheduled Jobs list from Yarn Resource Manager
+ Set<String> submittedJobs = getSubmittedYarnApps();
+ LOG.info("Already Submitted/Running MR index build jobs - " + submittedJobs);
+
+ // Get final jobs to submit
+ Set<PhoenixAsyncIndex> jobsToSchedule = getJobsToSubmit(candidateJobs, submittedJobs);
+
+ LOG.info("Final indexes to be built - " + jobsToSchedule);
+ List<Future<Boolean>> results = new ArrayList<Future<Boolean>>(jobsToSchedule.size());
+
+ int failedJobSubmissionCount = 0;
+ int timedoutJobSubmissionCount = 0;
+ ExecutorService jobSubmitPool = Executors.newFixedThreadPool(10);
+ LOG.info("Attempt to submit MR index build jobs for - " + jobsToSchedule);
+
+ try {
+ for (PhoenixAsyncIndex indexToBuild : jobsToSchedule) {
+ PhoenixMRJobCallable task =
+ new PhoenixMRJobCallable(HBaseConfiguration.create(conf), indexToBuild, "/");
+ results.add(jobSubmitPool.submit(task));
+ }
+ for (Future<Boolean> result : results) {
+ try {
+ result.get(JOB_SUBMIT_POOL_TIMEOUT, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ failedJobSubmissionCount++;
+ } catch (ExecutionException e) {
+ failedJobSubmissionCount++;
+ } catch (TimeoutException e) {
+ timedoutJobSubmissionCount++;
+ }
+ }
+ } finally {
+ PhoenixMRJobUtil.shutdown(jobSubmitPool);
+ }
+
+ LOG.info("Result of Attempt to Submit MR index build Jobs - Jobs attempted = "
+ + jobsToSchedule.size() + " ; Failed to Submit = " + failedJobSubmissionCount
+ + " ; Timed out = " + timedoutJobSubmissionCount);
+ return failedJobSubmissionCount;
+ }
+
+ public Set<PhoenixAsyncIndex> getJobsToSubmit(Map<String, PhoenixAsyncIndex> candidateJobs,
+ Set<String> submittedJobs) {
+ Set<PhoenixAsyncIndex> toScheduleJobs =
+ new HashSet<PhoenixAsyncIndex>(candidateJobs.values());
+ for (String jobId : submittedJobs) {
+ if (candidateJobs.containsKey(jobId)) {
+ toScheduleJobs.remove(candidateJobs.get(jobId));
+ }
+ }
+ toScheduleJobs.removeAll(submittedJobs);
+ return toScheduleJobs;
+ }
+
+ public Set<String> getSubmittedYarnApps() throws Exception {
+ String rmHost = PhoenixMRJobUtil.getActiveResourceManagerHost(conf, zkQuorum);
+ Map<String, String> urlParams = new HashMap<String, String>();
+ urlParams.put(YarnApplication.APP_STATES_ELEMENT, YarnApplication.state.NEW.toString()
+ + "," + YarnApplication.state.ACCEPTED + "," + YarnApplication.state.SUBMITTED
+ + "," + YarnApplication.state.RUNNING);
+ int rmPort = PhoenixMRJobUtil.getRMPort(conf);
+ String response = PhoenixMRJobUtil.getJobsInformationFromRM(rmHost, rmPort, urlParams);
+ LOG.debug("Already Submitted/Running Apps = " + response);
+ JSONObject jobsJson = new JSONObject(response);
+ JSONObject appsJson = jobsJson.optJSONObject(YarnApplication.APPS_ELEMENT);
+ Set<String> yarnApplicationSet = new HashSet<String>();
+
+ if (appsJson == null) {
+ return yarnApplicationSet;
+ }
+ JSONArray appJson = appsJson.optJSONArray(YarnApplication.APP_ELEMENT);
+ if (appJson == null) {
+ return yarnApplicationSet;
+ }
+ for (int i = 0; i < appJson.length(); i++) {
+
+ Gson gson = new GsonBuilder().create();
+ YarnApplication yarnApplication =
+ gson.fromJson(appJson.getJSONObject(i).toString(),
+ new TypeToken<YarnApplication>() {
+ }.getType());
+ yarnApplicationSet.add(yarnApplication.getName());
+ }
+
+ return yarnApplicationSet;
+ }
+
+ public static void main(String[] args) throws Exception {
+ PhoenixMRJobSubmitter t = new PhoenixMRJobSubmitter();
+ t.scheduleIndexBuilds();
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/137feb62/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/YarnApplication.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/YarnApplication.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/YarnApplication.java
new file mode 100644
index 0000000..926aea3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/YarnApplication.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index.automation;
+
+public class YarnApplication {
+
+ public static final String APP_ELEMENT = "app";
+ public static final String APPS_ELEMENT = "apps";
+ public static final String APP_STATES_ELEMENT = "states";
+
+ private long finishedTime;
+ private String amContainerLogs;
+ private String trackingUI;
+
+ public enum state {
+ NEW, ACCEPTED, SUBMITTED, RUNNING, FINISHED
+ }
+
+ private String user;
+ private String id;
+ private String clusterId;
+
+ public enum finalStatus {
+ SUCCEEDED, FAILED, KILLED, UNDEFINED
+ }
+
+ private String amHostHttpAddress;
+ private double progress;
+ private String name;
+ private long startedTime;
+ private long elapsedTime;
+ private String diagnostics;
+ private String trackingUrl;
+ private String queue;
+ private int allocatedMB;
+ private int allocatedVCores;
+ private int runningContainers;
+ private int memorySeconds;
+ private int vcoreSeconds;
+
+ public long getFinishedTime() {
+ return finishedTime;
+ }
+
+ public void setFinishedTime(long finishedTime) {
+ this.finishedTime = finishedTime;
+ }
+
+ public String getAmContainerLogs() {
+ return amContainerLogs;
+ }
+
+ public void setAmContainerLogs(String amContainerLogs) {
+ this.amContainerLogs = amContainerLogs;
+ }
+
+ public String getTrackingUI() {
+ return trackingUI;
+ }
+
+ public void setTrackingUI(String trackingUI) {
+ this.trackingUI = trackingUI;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public String getAmHostHttpAddress() {
+ return amHostHttpAddress;
+ }
+
+ public void setAmHostHttpAddress(String amHostHttpAddress) {
+ this.amHostHttpAddress = amHostHttpAddress;
+ }
+
+ public double getProgress() {
+ return progress;
+ }
+
+ public void setProgress(double progress) {
+ this.progress = progress;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public long getStartedTime() {
+ return startedTime;
+ }
+
+ public void setStartedTime(long startedTime) {
+ this.startedTime = startedTime;
+ }
+
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+
+ public void setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public void setDiagnostics(String diagnostics) {
+ this.diagnostics = diagnostics;
+ }
+
+ public String getTrackingUrl() {
+ return trackingUrl;
+ }
+
+ public void setTrackingUrl(String trackingUrl) {
+ this.trackingUrl = trackingUrl;
+ }
+
+ public String getQueue() {
+ return queue;
+ }
+
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ public int getAllocatedMB() {
+ return allocatedMB;
+ }
+
+ public void setAllocatedMB(int allocatedMB) {
+ this.allocatedMB = allocatedMB;
+ }
+
+ public int getAllocatedVCores() {
+ return allocatedVCores;
+ }
+
+ public void setAllocatedVCores(int allocatedVCores) {
+ this.allocatedVCores = allocatedVCores;
+ }
+
+ public int getRunningContainers() {
+ return runningContainers;
+ }
+
+ public void setRunningContainers(int runningContainers) {
+ this.runningContainers = runningContainers;
+ }
+
+ public int getMemorySeconds() {
+ return memorySeconds;
+ }
+
+ public void setMemorySeconds(int memorySeconds) {
+ this.memorySeconds = memorySeconds;
+ }
+
+ public int getVcoreSeconds() {
+ return vcoreSeconds;
+ }
+
+ public void setVcoreSeconds(int vcoreSeconds) {
+ this.vcoreSeconds = vcoreSeconds;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/137feb62/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
new file mode 100644
index 0000000..f12d49d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixMRJobUtil.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.UnsupportedEncodingException;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.ProtocolException;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ActiveRMInfoProto;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.codehaus.jettison.json.JSONException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class PhoenixMRJobUtil {
+
+ private static final String YARN_LEADER_ELECTION = "/yarn-leader-election";
+ private static final String ACTIVE_STANDBY_ELECTOR_LOCK = "ActiveStandbyElectorLock";
+ private static final String RM_APPS_GET_ENDPOINT = "/ws/v1/cluster/apps";
+
+ // Reduced HBase Client Retries
+ private static final int CLIENT_RETRIES_NUMBER = 2;
+ private static final long CLIENT_PAUSE_TIME = 1000;
+ private static final int ZOOKEEPER_RECOVERY_RETRY_COUNT = 1;
+
+ public static final String PHOENIX_INDEX_MR_QUEUE_NAME_PROPERTY =
+ "phoenix.index.mr.scheduler.capacity.queuename";
+ public static final String PHOENIX_INDEX_MR_MAP_MEMORY_PROPERTY =
+ "phoenix.index.mr.scheduler.capacity.mapMemoryMB";
+
+ // Default MR Capacity Scheduler Configurations for Phoenix MR Index Build
+ // Jobs
+ public static final String DEFAULT_QUEUE_NAME = "default";
+ public static final int DEFAULT_MAP_MEMROY_MB = 5120;
+ public static final String XMX_OPT = "-Xmx";
+
+ public static final String RM_HTTP_SCHEME = "http";
+ // TODO - Move these as properties?
+ public static final int RM_CONNECT_TIMEOUT_MILLIS = 10 * 1000;
+ public static final int RM_READ_TIMEOUT_MILLIS = 10 * 60 * 1000;
+
+ private static final Log LOG = LogFactory.getLog(PhoenixMRJobUtil.class);
+
+ public static final String PHOENIX_MR_SCHEDULER_TYPE_NAME = "phoenix.index.mr.scheduler.type";
+
+ public enum MR_SCHEDULER_TYPE {
+ CAPACITY, FAIR, NONE
+ };
+
+ public static String getActiveResourceManagerHost(Configuration config, String zkQuorum)
+ throws IOException, InterruptedException, JSONException, KeeperException,
+ InvalidProtocolBufferException, ZooKeeperConnectionException {
+ ZooKeeperWatcher zkw = null;
+ ZooKeeper zk = null;
+ String activeRMHost = null;
+ try {
+ zkw = new ZooKeeperWatcher(config, "get-active-yarnmanager", null);
+ zk = new ZooKeeper(zkQuorum, 30000, zkw, false);
+
+ List<String> children = zk.getChildren(YARN_LEADER_ELECTION, zkw);
+ for (String subEntry : children) {
+ List<String> subChildern =
+ zk.getChildren(YARN_LEADER_ELECTION + "/" + subEntry, zkw);
+ for (String eachEntry : subChildern) {
+ if (eachEntry.contains(ACTIVE_STANDBY_ELECTOR_LOCK)) {
+ String path =
+ YARN_LEADER_ELECTION + "/" + subEntry + "/"
+ + ACTIVE_STANDBY_ELECTOR_LOCK;
+ byte[] data = zk.getData(path, zkw, new Stat());
+ ActiveRMInfoProto proto = ActiveRMInfoProto.parseFrom(data);
+ proto.getRmId();
+ LOG.info("Active RmId : " + proto.getRmId());
+
+ activeRMHost =
+ config.get(YarnConfiguration.RM_HOSTNAME + "." + proto.getRmId());
+ LOG.info("activeResourceManagerHostname = " + activeRMHost);
+
+ }
+ }
+ }
+ } finally {
+ if (zkw != null) zkw.close();
+ if (zk != null) zk.close();
+ }
+
+ return activeRMHost;
+ }
+
+ public static String getJobsInformationFromRM(String rmhost, int rmport,
+ Map<String, String> urlParams) throws MalformedURLException, ProtocolException,
+ UnsupportedEncodingException, IOException {
+ HttpURLConnection con = null;
+ String response = null;
+ String url = null;
+
+ try {
+ StringBuilder urlBuilder = new StringBuilder();
+
+ urlBuilder.append(RM_HTTP_SCHEME + "://").append(rmhost).append(":").append(rmport)
+ .append(RM_APPS_GET_ENDPOINT);
+
+ if (urlParams != null && urlParams.size() != 0) {
+ urlBuilder.append("?");
+ for (String key : urlParams.keySet()) {
+ urlBuilder.append(key + "=" + urlParams.get(key) + "&");
+ }
+ urlBuilder.delete(urlBuilder.length() - 1, urlBuilder.length());
+ }
+
+ url = urlBuilder.toString();
+ LOG.info("Attempt to get running/submitted jobs information from RM URL = " + url);
+
+ URL obj = new URL(url);
+ con = (HttpURLConnection) obj.openConnection();
+ con.setInstanceFollowRedirects(true);
+ con.setRequestMethod("GET");
+
+ con.setConnectTimeout(RM_CONNECT_TIMEOUT_MILLIS);
+ con.setReadTimeout(RM_READ_TIMEOUT_MILLIS);
+
+ response = getTextContent(con.getInputStream());
+ } finally {
+ if (con != null) con.disconnect();
+ }
+
+ LOG.info("Result of attempt to get running/submitted jobs from RM - URL=" + url
+ + ",ResponseCode=" + con.getResponseCode() + ",Response=" + response);
+
+ return response;
+ }
+
+ public static String getTextContent(InputStream is) throws IOException {
+ BufferedReader in = null;
+ StringBuilder response = null;
+ try {
+ in = new BufferedReader(new InputStreamReader(is));
+ String inputLine;
+ response = new StringBuilder();
+ while ((inputLine = in.readLine()) != null) {
+ response.append(inputLine).append("\n");
+ }
+ } finally {
+ if (in != null) in.close();
+ if (is != null) {
+ is.close();
+ }
+ }
+ return response.toString();
+ }
+
+ public static void shutdown(ExecutorService pool) throws InterruptedException {
+ pool.shutdown();
+ LOG.debug("Shutdown called");
+ pool.awaitTermination(200, TimeUnit.MILLISECONDS);
+ LOG.debug("Await termination called to wait for 200 msec");
+ if (!pool.isShutdown()) {
+ pool.shutdownNow();
+ LOG.debug("Await termination called to wait for 200 msec");
+ pool.awaitTermination(100, TimeUnit.MILLISECONDS);
+ }
+ if (!pool.isShutdown()) {
+ LOG.warn("Pool did not shutdown");
+ }
+ }
+
+ public static int getRMPort(Configuration conf) throws IOException {
+ String rmHostPortStr = conf.get(YarnConfiguration.RM_WEBAPP_ADDRESS);
+ String[] rmHostPort = rmHostPortStr.split(":");
+ if (rmHostPort == null || rmHostPort.length != 2) {
+ throw new IOException("Invalid value for property "
+ + YarnConfiguration.RM_WEBAPP_ADDRESS + " = " + rmHostPortStr);
+ }
+ int rmPort = Integer.parseInt(rmHostPort[1]);
+ return rmPort;
+ }
+
+ public static void updateTimeoutsToFailFast(Configuration conf) {
+ conf.set("hbase.client.retries.number", String.valueOf(CLIENT_RETRIES_NUMBER));
+ conf.set("zookeeper.recovery.retry", String.valueOf(ZOOKEEPER_RECOVERY_RETRY_COUNT));
+ conf.set("hbase.client.pause", String.valueOf(CLIENT_PAUSE_TIME));
+ }
+
+ /**
+ * This method set the configuration values for Capacity scheduler.
+ * @param conf - Configuration to which Capacity Queue information to be added
+ */
+ public static void updateCapacityQueueInfo(Configuration conf) {
+ conf.set(MRJobConfig.QUEUE_NAME,
+ conf.get(PHOENIX_INDEX_MR_QUEUE_NAME_PROPERTY, DEFAULT_QUEUE_NAME));
+ int mapMemoryMB = conf.getInt(PHOENIX_INDEX_MR_MAP_MEMORY_PROPERTY, DEFAULT_MAP_MEMROY_MB);
+
+ conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemoryMB);
+ conf.set(MRJobConfig.MAP_JAVA_OPTS, XMX_OPT + ((int) (mapMemoryMB * 0.9)) + "m");
+
+ LOG.info("Queue Name=" + conf.get(MRJobConfig.QUEUE_NAME) + ";" + "Map Meory MB="
+ + conf.get(MRJobConfig.MAP_MEMORY_MB) + ";" + "Map Java Opts="
+ + conf.get(MRJobConfig.MAP_JAVA_OPTS));
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/137feb62/phoenix-core/src/main/java/org/apache/phoenix/util/ZKBasedMasterElectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ZKBasedMasterElectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ZKBasedMasterElectionUtil.java
new file mode 100644
index 0000000..69ef0b5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ZKBasedMasterElectionUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Ids;
+
+public class ZKBasedMasterElectionUtil {
+
+ private static final Log LOG = LogFactory.getLog(ZKBasedMasterElectionUtil.class);
+
+ public static boolean acquireLock(ZooKeeperWatcher zooKeeperWatcher, String parentNode,
+ String lockName) throws KeeperException, InterruptedException {
+ // Create the parent node as Persistent
+ LOG.info("Creating the parent lock node:" + parentNode);
+ ZKUtil.createWithParents(zooKeeperWatcher, parentNode);
+
+ // Create the ephemeral node
+ String lockNode = parentNode + "/" + lockName;
+ String nodeValue = getHostName() + "_" + UUID.randomUUID().toString();
+ LOG.info("Trying to acquire the lock by creating node:" + lockNode + " value:" + nodeValue);
+ // Create the ephemeral node
+ try {
+ zooKeeperWatcher.getRecoverableZooKeeper().create(lockNode, Bytes.toBytes(nodeValue),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } catch (KeeperException.NodeExistsException e) {
+ LOG.info("Could not acquire lock. Another process had already acquired the lock on Node "
+ + lockName);
+ return false;
+ }
+ LOG.info("Obtained the lock :" + lockNode);
+ return true;
+ }
+
+ private static String getHostName() {
+ String host = "";
+ try {
+ host = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ LOG.error("UnknownHostException while trying to get the Local Host address : ", e);
+ }
+ return host;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/137feb62/phoenix-core/src/test/java/org/apache/phoenix/index/automated/TestMRJobSubmitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/automated/TestMRJobSubmitter.java b/phoenix-core/src/test/java/org/apache/phoenix/index/automated/TestMRJobSubmitter.java
new file mode 100644
index 0000000..649f22a
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/automated/TestMRJobSubmitter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.index.automated;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.mapreduce.index.automation.PhoenixAsyncIndex;
+import org.apache.phoenix.mapreduce.index.automation.PhoenixMRJobSubmitter;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMRJobSubmitter {
+
+ private Map<String, PhoenixAsyncIndex> candidateJobs =
+ new LinkedHashMap<String, PhoenixAsyncIndex>();
+ private Set<String> submittedJobs = new HashSet<String>();
+
+ @Before
+ public void prepare() {
+ PhoenixAsyncIndex index1 = new PhoenixAsyncIndex();
+ index1.setDataTableName("DT1");
+ index1.setTableName("IT1");
+ index1.setIndexType(IndexType.LOCAL);
+
+ candidateJobs.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+ index1.getDataTableName(), index1.getTableName()), index1);
+
+ PhoenixAsyncIndex index2 = new PhoenixAsyncIndex();
+ index2.setDataTableName("DT2");
+ index2.setTableName("IT2");
+ index2.setIndexType(IndexType.LOCAL);
+
+ candidateJobs.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+ index2.getDataTableName(), index2.getTableName()), index2);
+ }
+
+ @Test
+ public void testLocalIndexJobsSubmission() throws IOException {
+
+ // Set the index type to LOCAL
+ for (String jobId : candidateJobs.keySet()) {
+ candidateJobs.get(jobId).setIndexType(IndexType.LOCAL);
+ }
+ PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
+ Set<PhoenixAsyncIndex> jobsToSubmit =
+ submitter.getJobsToSubmit(candidateJobs, submittedJobs);
+ assertEquals(2, jobsToSubmit.size());
+ }
+
+ @Test
+ public void testGlobalIndexJobsForSubmission() throws IOException {
+
+ // Set the index type to GLOBAL
+ for (String jobId : candidateJobs.keySet()) {
+ candidateJobs.get(jobId).setIndexType(IndexType.GLOBAL);
+ }
+ PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
+ Set<PhoenixAsyncIndex> jobsToSubmit =
+ submitter.getJobsToSubmit(candidateJobs, submittedJobs);
+ assertEquals(2, jobsToSubmit.size());
+ assertEquals(true, jobsToSubmit.containsAll(candidateJobs.values()));
+ }
+
+ @Test
+ public void testSkipSubmittedJob() throws IOException {
+ PhoenixAsyncIndex[] jobs = new PhoenixAsyncIndex[candidateJobs.size()];
+ candidateJobs.values().toArray(jobs);
+
+ // Mark one job as running
+ submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+ jobs[0].getDataTableName(), jobs[0].getTableName()));
+
+ PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
+ Set<PhoenixAsyncIndex> jobsToSubmit =
+ submitter.getJobsToSubmit(candidateJobs, submittedJobs);
+
+ // Should not contain the running job
+ assertEquals(1, jobsToSubmit.size());
+ assertEquals(false, jobsToSubmit.containsAll(candidateJobs.values()));
+ assertEquals(true, jobsToSubmit.contains(jobs[1]));
+ }
+
+ @Test
+ public void testSkipAllSubmittedJobs() throws IOException {
+ PhoenixAsyncIndex[] jobs = new PhoenixAsyncIndex[candidateJobs.size()];
+ candidateJobs.values().toArray(jobs);
+
+ // Mark all the candidate jobs as running/in-progress
+ submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+ jobs[0].getDataTableName(), jobs[0].getTableName()));
+ submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+ jobs[1].getDataTableName(), jobs[1].getTableName()));
+
+ PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
+ Set<PhoenixAsyncIndex> jobsToSubmit =
+ submitter.getJobsToSubmit(candidateJobs, submittedJobs);
+ assertEquals(0, jobsToSubmit.size());
+ }
+
+ @Test
+ public void testNoJobsToSubmit() throws IOException {
+ // Clear candidate jobs
+ candidateJobs.clear();
+ // Add some dummy running jobs to the submitted list
+ submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+ "d1", "i1"));
+ submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
+ "d2", "i2"));
+ PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter();
+ Set<PhoenixAsyncIndex> jobsToSubmit =
+ submitter.getJobsToSubmit(candidateJobs, submittedJobs);
+ assertEquals(0, jobsToSubmit.size());
+ }
+}
\ No newline at end of file
[2/2] phoenix git commit: PHOENIX-2998 Add test for setting RPC
timeout for a connection
Posted by ja...@apache.org.
PHOENIX-2998 Add test for setting RPC timeout for a connection
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/56ba3738
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/56ba3738
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/56ba3738
Branch: refs/heads/4.x-HBase-1.1
Commit: 56ba3738652fd2b7f14f3ee0f3d761c64e003e7a
Parents: 137feb6
Author: James Taylor <ja...@apache.org>
Authored: Tue Jun 14 17:16:33 2016 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Mon Jun 20 10:51:30 2016 -0700
----------------------------------------------------------------------
.../apache/phoenix/end2end/QueryTimeoutIT.java | 32 +++++++++++++++++-
.../java/org/apache/phoenix/util/QueryUtil.java | 34 +++++++++++++++-----
2 files changed, 57 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/56ba3738/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
index ccd6530..fd1f5db 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryTimeoutIT.java
@@ -21,6 +21,7 @@ package org.apache.phoenix.end2end;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -36,8 +37,11 @@ import java.util.Properties;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.After;
import org.junit.BeforeClass;
@@ -51,11 +55,12 @@ public class QueryTimeoutIT extends BaseOwnClusterHBaseManagedTimeIT {
@BeforeClass
public static void doSetup() throws Exception {
- Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+ Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
// Must update config before starting server
props.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(700));
props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(10000));
props.put(QueryServices.EXPLAIN_CHUNK_COUNT_ATTRIB, Boolean.TRUE.toString());
+ props.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
@@ -71,6 +76,31 @@ public class QueryTimeoutIT extends BaseOwnClusterHBaseManagedTimeIT {
}
@Test
+ public void testSetRPCTimeOnConnection() throws Exception {
+ Properties overriddenProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ overriddenProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ overriddenProps.setProperty("hbase.rpc.timeout", Long.toString(100));
+ String url = QueryUtil.getConnectionUrl(overriddenProps, config, "longRunning");
+ Connection conn1 = DriverManager.getConnection(url, overriddenProps);
+ ConnectionQueryServices s1 = conn1.unwrap(PhoenixConnection.class).getQueryServices();
+ ReadOnlyProps configProps = s1.getProps();
+ assertEquals("100", configProps.get("hbase.rpc.timeout"));
+
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+ Connection conn2 = DriverManager.getConnection(getUrl(), props);
+ ConnectionQueryServices s2 = conn2.unwrap(PhoenixConnection.class).getQueryServices();
+ assertFalse(s1 == s2);
+ Connection conn3 = DriverManager.getConnection(getUrl(), props);
+ ConnectionQueryServices s3 = conn3.unwrap(PhoenixConnection.class).getQueryServices();
+ assertTrue(s2 == s3);
+
+ Connection conn4 = DriverManager.getConnection(url, overriddenProps);
+ ConnectionQueryServices s4 = conn4.unwrap(PhoenixConnection.class).getQueryServices();
+ assertTrue(s1 == s4);
+ }
+
+ @Test
public void testQueryTimeout() throws Exception {
int nRows = 30000;
Connection conn;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/56ba3738/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index 48259e0..6d8e00d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -233,39 +233,53 @@ public final class QueryUtil {
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum) {
- return getUrlInternal(zkQuorum, null, null);
+ return getUrlInternal(zkQuorum, null, null, null);
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum, int clientPort) {
- return getUrlInternal(zkQuorum, clientPort, null);
+ return getUrlInternal(zkQuorum, clientPort, null, null);
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum, String znodeParent) {
- return getUrlInternal(zkQuorum, null, znodeParent);
+ return getUrlInternal(zkQuorum, null, znodeParent, null);
+ }
+
+ /**
+ * Create the Phoenix JDBC connection URL from the provided cluster connection details.
+ */
+ public static String getUrl(String zkQuorum, int port, String znodeParent, String principal) {
+ return getUrlInternal(zkQuorum, port, znodeParent, principal);
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum, int port, String znodeParent) {
- return getUrlInternal(zkQuorum, port, znodeParent);
+ return getUrlInternal(zkQuorum, port, znodeParent, null);
}
/**
* Create the Phoenix JDBC connection URL from the provided cluster connection details.
*/
public static String getUrl(String zkQuorum, Integer port, String znodeParent) {
- return getUrlInternal(zkQuorum, port, znodeParent);
+ return getUrlInternal(zkQuorum, port, znodeParent, null);
+ }
+
+ /**
+ * Create the Phoenix JDBC connection URL from the provided cluster connection details.
+ */
+ public static String getUrl(String zkQuorum, Integer port, String znodeParent, String principal) {
+ return getUrlInternal(zkQuorum, port, znodeParent, principal);
}
- private static String getUrlInternal(String zkQuorum, Integer port, String znodeParent) {
- return new PhoenixEmbeddedDriver.ConnectionInfo(zkQuorum, port, znodeParent).toUrl()
+ private static String getUrlInternal(String zkQuorum, Integer port, String znodeParent, String principal) {
+ return new PhoenixEmbeddedDriver.ConnectionInfo(zkQuorum, port, znodeParent, principal, null).toUrl()
+ PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
}
@@ -328,6 +342,10 @@ public final class QueryUtil {
public static String getConnectionUrl(Properties props, Configuration conf)
throws ClassNotFoundException, SQLException {
+ return getConnectionUrl(props, conf, null);
+ }
+ public static String getConnectionUrl(Properties props, Configuration conf, String principal)
+ throws ClassNotFoundException, SQLException {
// TODO: props is ignored!
// read the hbase properties from the configuration
String server = ZKConfig.getZKQuorumServersString(conf);
@@ -362,7 +380,7 @@ public final class QueryUtil {
server = Joiner.on(',').join(servers);
String znodeParent = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
- String url = getUrl(server, port, znodeParent);
+ String url = getUrl(server, port, znodeParent, principal);
// Mainly for testing to tack on the test=true part to ensure driver is found on server
String extraArgs = conf.get(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
if (extraArgs.length() > 0) {