You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/01 03:30:55 UTC
[01/11] hive git commit: HIVE-13588: NPE is thrown from
MapredLocalTask.executeInChildVM (Chaoyu Tang, reviewed by Yongzhi Chen)
Repository: hive
Updated Branches:
refs/heads/llap 390cb8cd0 -> 9f999f252
HIVE-13588: NPE is thrown from MapredLocalTask.executeInChildVM (Chaoyu Tang, reviewed by Yongzhi Chen)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ba864a24
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ba864a24
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ba864a24
Branch: refs/heads/llap
Commit: ba864a241b08c7916be1eefe08f7972451aeda15
Parents: 0ebcd93
Author: ctang <ct...@cloudera.com>
Authored: Thu Apr 28 22:06:47 2016 -0400
Committer: ctang <ct...@cloudera.com>
Committed: Thu Apr 28 22:06:47 2016 -0400
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 16 ++++++++++++----
.../clientpositive/auto_sortmerge_join_8.q.out | 2 --
.../results/clientpositive/llap/tez_join_hash.q.out | 4 ----
.../results/clientpositive/tez/tez_join_hash.q.out | 4 ----
4 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/ba864a24/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 3c1f0de..24bf506 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -319,10 +319,18 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
CachingPrintStream errPrintStream = new CachingPrintStream(System.err);
- StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out,
- OperationLog.getCurrentOperationLog().getPrintStream());
- StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream,
- OperationLog.getCurrentOperationLog().getPrintStream());
+ StreamPrinter outPrinter;
+ StreamPrinter errPrinter;
+ OperationLog operationLog = OperationLog.getCurrentOperationLog();
+ if (operationLog != null) {
+ outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out,
+ operationLog.getPrintStream());
+ errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream,
+ operationLog.getPrintStream());
+ } else {
+ outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
+ errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream);
+ }
outPrinter.start();
errPrinter.start();
http://git-wip-us.apache.org/repos/asf/hive/blob/ba864a24/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out b/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
index ce0590c..23a3685 100644
--- a/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
+++ b/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
@@ -1411,8 +1411,6 @@ PREHOOK: Input: default@bucket_small
PREHOOK: Input: default@bucket_small@ds=2008-04-08
PREHOOK: Input: default@bucket_small@ds=2008-04-09
#### A masked pattern was here ####
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
-ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
POSTHOOK: query: select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key
POSTHOOK: type: QUERY
POSTHOOK: Input: default@bucket_big
http://git-wip-us.apache.org/repos/asf/hive/blob/ba864a24/ql/src/test/results/clientpositive/llap/tez_join_hash.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/tez_join_hash.q.out b/ql/src/test/results/clientpositive/llap/tez_join_hash.q.out
index 1fd45aa..54ca9d2 100644
--- a/ql/src/test/results/clientpositive/llap/tez_join_hash.q.out
+++ b/ql/src/test/results/clientpositive/llap/tez_join_hash.q.out
@@ -652,10 +652,6 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
#### A masked pattern was here ####
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
-ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
-ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
POSTHOOK: query: select key, count(*) from (select x.key as key, y.value as value from
srcpart x join srcpart y on (x.key = y.key)
union all
http://git-wip-us.apache.org/repos/asf/hive/blob/ba864a24/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out b/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
index 2f51094..8d0aba1 100644
--- a/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
+++ b/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
@@ -638,10 +638,6 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
#### A masked pattern was here ####
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
-ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
-ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
POSTHOOK: query: select key, count(*) from (select x.key as key, y.value as value from
srcpart x join srcpart y on (x.key = y.key)
union all
[08/11] hive git commit: HIVE-12963 : LIMIT statement with SORT BY
creates additional MR job with hardcoded only one reducer (Alina Abramova,
reviewed by Sergey Shelukhin)
Posted by jd...@apache.org.
HIVE-12963 : LIMIT statement with SORT BY creates additional MR job with hardcoded only one reducer (Alina Abramova, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/324a2c6e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/324a2c6e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/324a2c6e
Branch: refs/heads/llap
Commit: 324a2c6ec86aba7c1baf74caf52b615b400825c9
Parents: 6460529
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 29 12:13:18 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 29 12:13:18 2016 -0700
----------------------------------------------------------------------
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java | 2 ++
itests/src/test/resources/testconfiguration.properties | 1 +
ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +-
ql/src/test/results/clientpositive/groupby1_limit.q.out | 2 +-
4 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/324a2c6e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 46a3b96..b13de92 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1009,6 +1009,8 @@ public class HiveConf extends Configuration {
"This parameter decides if Hive should add an additional map-reduce job. If the grouping set\n" +
"cardinality (4 in the example above), is more than this value, a new MR job is added under the\n" +
"assumption that the original group by will reduce the data size."),
+ HIVE_GROUPBY_LIMIT_EXTRASTEP("hive.groupby.limit.extrastep", true, "This parameter decides if Hive should \n" +
+ "create new MR job for sorting final output"),
// Max filesize used to do a single copy (after that, distcp is used)
HIVE_EXEC_COPYFILE_MAXSIZE("hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/,
http://git-wip-us.apache.org/repos/asf/hive/blob/324a2c6e/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e46e6ce..0ef3161 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -874,6 +874,7 @@ spark.query.files=add_part_multiple.q, \
groupby_sort_1_23.q, \
groupby_sort_skew_1.q, \
groupby_sort_skew_1_23.q, \
+ qroupby_limit_extrastep.q, \
having.q, \
identity_project_remove_skip.q, \
index_auto_self_join.q, \
http://git-wip-us.apache.org/repos/asf/hive/blob/324a2c6e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index cfe4497..06db7f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7300,7 +7300,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
Operator curr = genLimitPlan(dest, qb, input, offset, limit);
// the client requested that an extra map-reduce step be performed
- if (!extraMRStep) {
+ if (!extraMRStep || !HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_GROUPBY_LIMIT_EXTRASTEP)){
return curr;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/324a2c6e/ql/src/test/results/clientpositive/groupby1_limit.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/groupby1_limit.q.out b/ql/src/test/results/clientpositive/groupby1_limit.q.out
index aacd23c..8d7fbfa 100644
--- a/ql/src/test/results/clientpositive/groupby1_limit.q.out
+++ b/ql/src/test/results/clientpositive/groupby1_limit.q.out
@@ -68,7 +68,7 @@ STAGE PLANS:
Map Operator Tree:
TableScan
Reduce Output Operator
- sort order:
+ sort order:
Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE
TopN Hash Memory Usage: 0.1
value expressions: _col0 (type: string), _col1 (type: double)
[11/11] hive git commit: Merge branch 'master' into llap
Posted by jd...@apache.org.
Merge branch 'master' into llap
Conflicts:
common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f999f25
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f999f25
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f999f25
Branch: refs/heads/llap
Commit: 9f999f252746a11c766624ca947e31f3fe59ec07
Parents: 390cb8c 9e1fa0c
Author: Jason Dere <jd...@hortonworks.com>
Authored: Sat Apr 30 18:30:25 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Sat Apr 30 18:30:25 2016 -0700
----------------------------------------------------------------------
HIVE-13509.2.patch | 478 ++++++++++++
.../org/apache/hadoop/hive/conf/HiveConf.java | 10 +-
dev-support/jenkins-common.sh | 14 +
dev-support/jenkins-submit-build.sh | 4 +
.../hive/hcatalog/common/HCatConstants.java | 3 +
.../hcatalog/mapreduce/HCatBaseInputFormat.java | 29 +-
.../hive/hcatalog/pig/TestHCatLoader.java | 55 ++
.../service/cli/session/TestQueryDisplay.java | 4 +-
.../test/resources/testconfiguration.properties | 1 +
.../impl/LlapZookeeperRegistryImpl.java | 74 +-
.../hive/llap/tezplugins/ContainerFactory.java | 3 +-
.../tezplugins/LlapTaskSchedulerService.java | 377 ++++++++--
.../llap/tezplugins/helpers/MonotonicClock.java | 24 +
.../scheduler/LoggingFutureCallback.java | 44 ++
.../TestLlapTaskSchedulerService.java | 734 ++++++++++++++++++-
.../hadoop/hive/metastore/HiveMetaStore.java | 2 +-
.../metastore/TestHiveMetaStoreGetMetaConf.java | 151 ++++
.../java/org/apache/hadoop/hive/ql/Driver.java | 13 +-
.../org/apache/hadoop/hive/ql/QueryDisplay.java | 19 +-
.../org/apache/hadoop/hive/ql/QueryPlan.java | 16 +-
.../hadoop/hive/ql/exec/ConditionalTask.java | 1 +
.../apache/hadoop/hive/ql/exec/Registry.java | 323 +++++---
.../org/apache/hadoop/hive/ql/exec/Task.java | 14 +-
.../hadoop/hive/ql/exec/mr/ExecDriver.java | 2 +-
.../hive/ql/exec/mr/HadoopJobExecHelper.java | 6 +-
.../hadoop/hive/ql/exec/mr/MapredLocalTask.java | 16 +-
.../hadoop/hive/ql/exec/tez/DagUtils.java | 1 -
.../apache/hadoop/hive/ql/metadata/Hive.java | 39 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +-
.../hadoop/hive/ql/session/SessionState.java | 3 +-
.../clientpositive/auto_sortmerge_join_8.q.out | 2 -
.../results/clientpositive/groupby1_limit.q.out | 2 +-
.../clientpositive/llap/tez_join_hash.q.out | 4 -
.../clientpositive/tez/tez_join_hash.q.out | 4 -
.../apache/hive/service/cli/CLIServiceTest.java | 4 +-
.../apache/hadoop/hive/shims/ShimLoader.java | 10 +-
36 files changed, 2172 insertions(+), 316 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9f999f25/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --cc common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d7afa4d,b13de92..7db492f
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@@ -2806,8 -2808,9 +2811,11 @@@ public class HiveConf extends Configura
false,
"Whether to setup split locations to match nodes on which llap daemons are running," +
" instead of using the locations provided by the split itself"),
+ LLAP_VALIDATE_ACLS("hive.llap.validate.acls", true,
+ "Whether LLAP should reject permissive ACLs in some cases (e.g. its own management\n" +
+ "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."),
+ LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003,
+ "LLAP daemon output service port"),
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
http://git-wip-us.apache.org/repos/asf/hive/blob/9f999f25/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --cc llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 0dc7599,6981061..fde70e7
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@@ -87,8 -89,8 +89,9 @@@ public class LlapZookeeperRegistryImpl
private static final String IPC_MNG = "llapmng";
private static final String IPC_SHUFFLE = "shuffle";
private static final String IPC_LLAP = "llap";
+ private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
private final static String ROOT_NAMESPACE = "llap";
+ private final static String USER_SCOPE_PATH_PREFIX = "user-";
private final Configuration conf;
private final CuratorFramework zooKeeperClient;
@@@ -173,7 -177,8 +178,8 @@@
// worker does not respond due to communication interruptions it will retain the same sequence
// number when it returns back. If session timeout expires, the node will be deleted and new
// addition of the same node (restart) will get next sequence number
- this.pathPrefix = "/" + getZkPathUser(this.conf) + "/" + instanceName + "/workers/worker-";
- this.userPathPrefix = USER_SCOPE_PATH_PREFIX + RegistryUtils.currentUser();
++ this.userPathPrefix = USER_SCOPE_PATH_PREFIX + getZkPathUser(this.conf);
+ this.pathPrefix = "/" + userPathPrefix + "/" + instanceName + "/workers/worker-";
this.instancesCache = null;
this.instances = null;
this.stateChangeListeners = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/hive/blob/9f999f25/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
[05/11] hive git commit: HIVE-13510 : Dynamic partitioning doesn’t work when remote metastore is used (Illya Yalovyy via Ashutosh Chauhan)
Posted by jd...@apache.org.
HIVE-13510 : Dynamic partitioning doesn\u2019t work when remote metastore is used (Illya Yalovyy via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/134b6ccc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/134b6ccc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/134b6ccc
Branch: refs/heads/llap
Commit: 134b6cccbd7237901f7f7594626796863ca0150a
Parents: 347a5a5
Author: Illya Yalovyy <ya...@amazon.com>
Authored: Tue Apr 26 12:18:00 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Fri Apr 29 03:36:36 2016 -0700
----------------------------------------------------------------------
.../hadoop/hive/metastore/HiveMetaStore.java | 2 +-
.../metastore/TestHiveMetaStoreGetMetaConf.java | 151 +++++++++++++++++++
2 files changed, 152 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/134b6ccc/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index ed2057a..4ada9c1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -527,7 +527,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (confVar == null) {
throw new MetaException("Invalid configuration key " + key);
}
- return getConf().get(key);
+ return getConf().get(key, confVar.getDefaultValue());
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/134b6ccc/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreGetMetaConf.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreGetMetaConf.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreGetMetaConf.java
new file mode 100644
index 0000000..3f4561c
--- /dev/null
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreGetMetaConf.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.security.Permission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+import org.junit.Before;
+
+public class TestHiveMetaStoreGetMetaConf {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestHiveMetaStoreGetMetaConf.class);
+ private static final String msPort = "20103";
+ private static HiveConf hiveConf;
+ private static SecurityManager securityManager;
+
+ private HiveMetaStoreClient hmsc;
+
+ public static class NoExitSecurityManager extends SecurityManager {
+
+ @Override
+ public void checkPermission(Permission perm) {
+ // allow anything.
+ }
+
+ @Override
+ public void checkPermission(Permission perm, Object context) {
+ // allow anything.
+ }
+
+ @Override
+ public void checkExit(int status) {
+ super.checkExit(status);
+ throw new RuntimeException("System.exit() was called. Raising exception.");
+ }
+ }
+
+ private static class RunMS implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ HiveMetaStore.main(new String[]{"-v", "-p", msPort, "--hiveconf",
+ "hive.metastore.expression.proxy=" + MockPartitionExpressionForMetastore.class.getCanonicalName(),
+ "--hiveconf", "hive.metastore.try.direct.sql.ddl=false"});
+ } catch (Throwable t) {
+ LOG.error("Exiting. Got exception from metastore: ", t);
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LOG.info("Shutting down metastore.");
+ System.setSecurityManager(securityManager);
+ }
+
+ @BeforeClass
+ public static void startMetaStoreServer() throws Exception {
+
+ securityManager = System.getSecurityManager();
+ System.setSecurityManager(new NoExitSecurityManager());
+
+ hiveConf = new HiveConf(TestHiveMetaStoreGetMetaConf.class);
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
+ + msPort);
+ hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 10);
+
+ System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+ System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+
+ new Thread(new RunMS()).start();
+ }
+
+ @Before
+ public void setup() throws MetaException {
+ hmsc = new HiveMetaStoreClient(hiveConf);
+ }
+
+ @After
+ public void closeClient() {
+ if (hmsc != null) {
+ hmsc.close();
+ }
+ }
+
+ @Test
+ public void testGetMetaConfDefault() throws MetaException, TException {
+ HiveConf.ConfVars metaConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL;
+ String expected = metaConfVar.getDefaultValue();
+ String actual = hmsc.getMetaConf(metaConfVar.toString());
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testGetMetaConfDefaultEmptyString() throws MetaException, TException {
+ HiveConf.ConfVars metaConfVar = HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN;
+ String expected = "";
+ String actual = hmsc.getMetaConf(metaConfVar.toString());
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testGetMetaConfOverridden() throws MetaException, TException {
+ HiveConf.ConfVars metaConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL_DDL;
+ String expected = "false";
+ String actual = hmsc.getMetaConf(metaConfVar.toString());
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testGetMetaConfUnknownPreperty() throws MetaException, TException {
+ String unknownPropertyName = "hive.meta.foo.bar";
+ thrown.expect(MetaException.class);
+ thrown.expectMessage("Invalid configuration key " + unknownPropertyName);
+ hmsc.getMetaConf(unknownPropertyName);
+ }
+}
[03/11] hive git commit: HIVE-13572 : Redundant setting full file
status in Hive::copyFiles (Rui Li via Ashutosh Chauhan)
Posted by jd...@apache.org.
HIVE-13572 : Redundant setting full file status in Hive::copyFiles (Rui Li via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/076f3655
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/076f3655
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/076f3655
Branch: refs/heads/llap
Commit: 076f3655b846f3214bc304e405d321222a3819ab
Parents: 4377c7f
Author: Rui Li <ru...@intel.com>
Authored: Tue Apr 26 18:14:00 2016 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Apr 28 19:22:13 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/metadata/Hive.java | 39 +++++++++++---------
.../apache/hadoop/hive/shims/ShimLoader.java | 10 +++--
2 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/076f3655/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index ab165f1..4d9c3d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2641,26 +2641,29 @@ private void constructOneLBLocationMap(FileStatus fSta,
files = new FileStatus[] {src};
}
- for (FileStatus srcFile : files) {
-
- final Path srcP = srcFile.getPath();
- final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs);
- // Strip off the file type, if any so we don't make:
- // 000000_0.gz -> 000000_0.gz_copy_1
- final String name;
- final String filetype;
- String itemName = srcP.getName();
- int index = itemName.lastIndexOf('.');
- if (index >= 0) {
- filetype = itemName.substring(index);
- name = itemName.substring(0, index);
- } else {
- name = itemName;
- filetype = "";
- }
+ final SessionState parentSession = SessionState.get();
+ for (final FileStatus srcFile : files) {
+
futures.add(pool.submit(new Callable<ObjectPair<Path, Path>>() {
@Override
public ObjectPair<Path, Path> call() throws Exception {
+ SessionState.setCurrentSessionState(parentSession);
+ final Path srcP = srcFile.getPath();
+ final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs);
+ // Strip off the file type, if any so we don't make:
+ // 000000_0.gz -> 000000_0.gz_copy_1
+ final String name;
+ final String filetype;
+ String itemName = srcP.getName();
+ int index = itemName.lastIndexOf('.');
+ if (index >= 0) {
+ filetype = itemName.substring(index);
+ name = itemName.substring(0, index);
+ } else {
+ name = itemName;
+ filetype = "";
+ }
+
Path destPath = new Path(destf, srcP.getName());
if (!needToCopy && !isSrcLocal) {
for (int counter = 1; !destFs.rename(srcP,destPath); counter++) {
@@ -2671,7 +2674,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
if (inheritPerms) {
- ShimLoader.getHadoopShims().setFullFileStatus(conf, fullDestStatus, destFs, destf);
+ ShimLoader.getHadoopShims().setFullFileStatus(conf, fullDestStatus, destFs, destPath);
}
if (null != newFiles) {
newFiles.add(destPath);
http://git-wip-us.apache.org/repos/asf/hive/blob/076f3655/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
index 0fe3169..28d3e48 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
@@ -31,7 +31,7 @@ import java.util.Map;
public abstract class ShimLoader {
public static String HADOOP23VERSIONNAME = "0.23";
- private static HadoopShims hadoopShims;
+ private static volatile HadoopShims hadoopShims;
private static JettyShims jettyShims;
private static AppenderSkeleton eventCounter;
private static HadoopThriftAuthBridge hadoopThriftAuthBridge;
@@ -88,9 +88,13 @@ public abstract class ShimLoader {
* Factory method to get an instance of HadoopShims based on the
* version of Hadoop on the classpath.
*/
- public static synchronized HadoopShims getHadoopShims() {
+ public static HadoopShims getHadoopShims() {
if (hadoopShims == null) {
- hadoopShims = loadShims(HADOOP_SHIM_CLASSES, HadoopShims.class);
+ synchronized (ShimLoader.class) {
+ if (hadoopShims == null) {
+ hadoopShims = loadShims(HADOOP_SHIM_CLASSES, HadoopShims.class);
+ }
+ }
}
return hadoopShims;
}
[10/11] hive git commit: HIVE-13421 : Propagate job progress in
operation status
Posted by jd...@apache.org.
HIVE-13421 : Propagate job progress in operation status
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e1fa0ce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e1fa0ce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e1fa0ce
Branch: refs/heads/llap
Commit: 9e1fa0ce6f2003300640f0bee9b267e33d714084
Parents: 9179178
Author: Rajat Khandelwal <pr...@apache.org>
Authored: Sat Apr 30 08:16:51 2016 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Sat Apr 30 08:16:51 2016 +0530
----------------------------------------------------------------------
.../service/cli/session/TestQueryDisplay.java | 4 ++--
.../java/org/apache/hadoop/hive/ql/Driver.java | 13 +++++++++++--
.../org/apache/hadoop/hive/ql/QueryDisplay.java | 19 +++++++++++++++++--
.../org/apache/hadoop/hive/ql/QueryPlan.java | 16 +---------------
.../hadoop/hive/ql/exec/ConditionalTask.java | 1 +
.../org/apache/hadoop/hive/ql/exec/Task.java | 14 ++++++++++++--
.../hadoop/hive/ql/exec/mr/ExecDriver.java | 2 +-
.../hive/ql/exec/mr/HadoopJobExecHelper.java | 6 ++++--
.../apache/hive/service/cli/CLIServiceTest.java | 4 +++-
9 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
index 98581e0..cc18ce7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
@@ -154,8 +154,8 @@ public class TestQueryDisplay {
Assert.assertTrue(qDisplay1.getPerfLogStarts(QueryDisplay.Phase.COMPILATION).size() > 0);
Assert.assertTrue(qDisplay1.getPerfLogEnds(QueryDisplay.Phase.COMPILATION).size() > 0);
- Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 2);
- QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(1);
+ Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 1);
+ QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(0);
Assert.assertEquals(tInfo1.getTaskId(), "Stage-0");
Assert.assertEquals(tInfo1.getTaskType(), StageType.DDL);
Assert.assertTrue(tInfo1.getBeginTime() > 0 && tInfo1.getBeginTime() <= System.currentTimeMillis());
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index dad43fb..32d2cb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -450,7 +450,7 @@ public class Driver implements CommandProcessor {
schema = getSchema(sem, conf);
plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
- queryState.getHiveOperation(), schema, queryDisplay);
+ queryState.getHiveOperation(), schema);
conf.setQueryString(queryStr);
@@ -1507,7 +1507,7 @@ public class Driver implements CommandProcessor {
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
}
}
-
+ setQueryDisplays(plan.getRootTasks());
int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
int jobs = mrJobs
+ Utilities.getTezTasks(plan.getRootTasks()).size()
@@ -1739,6 +1739,15 @@ public class Driver implements CommandProcessor {
return (0);
}
+ private void setQueryDisplays(List<Task<? extends Serializable>> tasks) {
+ if (tasks != null) {
+ for (Task<? extends Serializable> task : tasks) {
+ task.setQueryDisplay(queryDisplay);
+ setQueryDisplays(task.getDependentTasks());
+ }
+ }
+ }
+
private void logMrWarning(int mrJobs) {
if (mrJobs <= 0 || !("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE)))) {
return;
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
index d582bc0..703e997 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
@@ -79,6 +79,8 @@ public class QueryDisplay {
private String name;
private boolean requireLock;
private boolean retryIfFail;
+ private String statusMessage;
+
// required for jackson
public TaskDisplay() {
@@ -158,15 +160,28 @@ public class QueryDisplay {
if (externalHandle == null && tTask.getExternalHandle() != null) {
this.externalHandle = tTask.getExternalHandle();
}
+ setStatusMessage(tTask.getStatusMessage());
switch (taskState) {
case RUNNING:
- beginTime = System.currentTimeMillis();
+ if (beginTime == null) {
+ beginTime = System.currentTimeMillis();
+ }
break;
case FINISHED:
- endTime = System.currentTimeMillis();
+ if (endTime == null) {
+ endTime = System.currentTimeMillis();
+ }
break;
}
}
+
+ public synchronized String getStatusMessage() {
+ return statusMessage;
+ }
+
+ public synchronized void setStatusMessage(String statusMessage) {
+ this.statusMessage = statusMessage;
+ }
}
public synchronized void setTaskResult(String taskId, TaskResult result) {
TaskDisplay taskDisplay = tasks.get(taskId);
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index ef0923d..e8c8ae6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -113,26 +113,12 @@ public class QueryPlan implements Serializable {
}
public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
- HiveOperation operation, Schema resultSchema) {
- this(queryString, sem, startTime, queryId, operation, resultSchema, null);
- }
- public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
- HiveOperation operation, Schema resultSchema, QueryDisplay queryDisplay) {
+ HiveOperation operation, Schema resultSchema) {
this.queryString = queryString;
rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks());
reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
fetchTask = sem.getFetchTask();
- if (queryDisplay != null) {
- if (fetchTask != null) {
- fetchTask.setQueryDisplay(queryDisplay);
- }
- if (rootTasks!= null) {
- for (Task t : rootTasks) {
- t.setQueryDisplay(queryDisplay);
- }
- }
- }
// Note that inputs and outputs can be changed when the query gets executed
inputs = sem.getAllInputs();
outputs = sem.getAllOutputs();
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
index c96c813..52cb445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
@@ -200,6 +200,7 @@ public class ConditionalTask extends Task<ConditionalWork> implements Serializab
public boolean addDependentTask(Task<? extends Serializable> dependent) {
boolean ret = false;
if (getListTasks() != null) {
+ ret = true;
for (Task<? extends Serializable> tsk : getListTasks()) {
ret = ret & tsk.addDependentTask(dependent);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 897af5e..34bdafd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -83,8 +83,18 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
protected String id;
protected T work;
private TaskState taskState = TaskState.CREATED;
+ private String statusMessage;
private transient boolean fetchSource;
+ public void setStatusMessage(String statusMessage) {
+ this.statusMessage = statusMessage;
+ updateStatusInQueryDisplay();
+ }
+
+ public String getStatusMessage() {
+ return statusMessage;
+ }
+
public enum FeedType {
DYNAMIC_PARTITIONS, // list of dynamic partitions
}
@@ -138,13 +148,13 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
this.queryDisplay = queryDisplay;
}
- private void updateStatusInQueryDisplay() {
+ protected void updateStatusInQueryDisplay() {
if (queryDisplay != null) {
queryDisplay.updateTaskStatus(this);
}
}
- private void setState(TaskState state) {
+ protected void setState(TaskState state) {
this.taskState = state;
updateStatusInQueryDisplay();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 639b0da..926f6e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -432,7 +432,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
this.jobID = rj.getJobID();
-
+ updateStatusInQueryDisplay();
returnVal = jobExecHelper.progress(rj, jc);
success = (returnVal == 0);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 760ba6c..11f5cfd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -367,12 +367,14 @@ public class HadoopJobExecHelper {
}
}
console.printInfo(output);
+ task.setStatusMessage(output);
reportTime = System.currentTimeMillis();
}
if (cpuMsec > 0) {
- console.printInfo("MapReduce Total cumulative CPU time: "
- + Utilities.formatMsecToStr(cpuMsec));
+ String status = "MapReduce Total cumulative CPU time: " + Utilities.formatMsecToStr(cpuMsec);
+ console.printInfo(status);
+ task.setStatusMessage(status);
}
boolean success;
http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index ff7e9a4..fb8ee4c 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -641,7 +641,8 @@ public abstract class CLIServiceTest {
SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
assertNotNull(sessionHandle);
// nonblocking execute
- String select = "SELECT ID + ' ' FROM TEST_EXEC_ASYNC";
+ String select = "select a.id, b.id from (SELECT ID + ' ' `ID` FROM TEST_EXEC_ASYNC) a full outer join "
+ + "(SELECT ID + ' ' `ID` FROM TEST_EXEC_ASYNC) b on a.ID=b.ID";
OperationHandle ophandle =
client.executeStatementAsync(sessionHandle, select, confOverlay);
@@ -697,6 +698,7 @@ public abstract class CLIServiceTest {
case FINISHED:
if (taskDisplay.getTaskType() == StageType.MAPRED || taskDisplay.getTaskType() == StageType.MAPREDLOCAL) {
assertNotNull(taskDisplay.getExternalHandle());
+ assertNotNull(taskDisplay.getStatusMessage());
}
assertNotNull(taskDisplay.getBeginTime());
assertNotNull(taskDisplay.getEndTime());
[02/11] hive git commit: HIVE-13509: HCatalog getSplits should ignore
the partition with invalid path (Chaoyu Tang,
reviewed by Szehon Ho and Mithun Radhakrishnan)
Posted by jd...@apache.org.
HIVE-13509: HCatalog getSplits should ignore the partition with invalid path (Chaoyu Tang, reviewed by Szehon Ho and Mithun Radhakrishnan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4377c7ff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4377c7ff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4377c7ff
Branch: refs/heads/llap
Commit: 4377c7ff03c93f0b1c69305a1b8852f579a3130b
Parents: ba864a2
Author: ctang <ct...@cloudera.com>
Authored: Thu Apr 28 22:10:59 2016 -0400
Committer: ctang <ct...@cloudera.com>
Committed: Thu Apr 28 22:10:59 2016 -0400
----------------------------------------------------------------------
HIVE-13509.2.patch | 478 +++++++++++++++++++
.../hive/hcatalog/common/HCatConstants.java | 3 +
.../hcatalog/mapreduce/HCatBaseInputFormat.java | 29 +-
.../hive/hcatalog/pig/TestHCatLoader.java | 55 +++
4 files changed, 559 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/HIVE-13509.2.patch
----------------------------------------------------------------------
diff --git a/HIVE-13509.2.patch b/HIVE-13509.2.patch
new file mode 100644
index 0000000..930b1f7
--- /dev/null
+++ b/HIVE-13509.2.patch
@@ -0,0 +1,478 @@
+diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+index 6b03fcb..d165e7e 100644
+--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
++++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+@@ -208,4 +208,7 @@ private HCatConstants() { // restrict instantiation
+ */
+ public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
+ public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
++
++ public static final String HCAT_INPUT_IGNORE_INVALID_PATH_KEY = "hcat.input.ignore.invalid.path";
++ public static final boolean HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT = false;
+ }
+diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+index adfaf4e..dbbdd61 100644
+--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
++++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+@@ -21,11 +21,11 @@
+
+ import java.io.IOException;
+ import java.util.ArrayList;
++import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.Map;
+ import java.util.HashMap;
+ import java.util.List;
+-
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+@@ -127,7 +127,10 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema)
+ //For each matching partition, call getSplits on the underlying InputFormat
+ for (PartInfo partitionInfo : partitionInfoList) {
+ jobConf = HCatUtil.getJobConfFromContext(jobContext);
+- setInputPath(jobConf, partitionInfo.getLocation());
++ List<String> setInputPath = setInputPath(jobConf, partitionInfo.getLocation());
++ if (setInputPath.isEmpty()) {
++ continue;
++ }
+ Map<String, String> jobProperties = partitionInfo.getJobProperties();
+
+ HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+@@ -281,7 +284,7 @@ private static InputJobInfo getJobInfo(Configuration conf)
+ return (InputJobInfo) HCatUtil.deserialize(jobString);
+ }
+
+- private void setInputPath(JobConf jobConf, String location)
++ private List<String> setInputPath(JobConf jobConf, String location)
+ throws IOException {
+
+ // ideally we should just call FileInputFormat.setInputPaths() here - but
+@@ -322,19 +325,33 @@ private void setInputPath(JobConf jobConf, String location)
+ }
+ pathStrings.add(location.substring(pathStart, length));
+
+- Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
+ String separator = "";
+ StringBuilder str = new StringBuilder();
+
+- for (Path path : paths) {
++ boolean ignoreInvalidPath =jobConf.getBoolean(HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_KEY,
++ HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT);
++ Iterator<String> pathIterator = pathStrings.iterator();
++ while (pathIterator.hasNext()) {
++ String pathString = pathIterator.next();
++ if (ignoreInvalidPath && org.apache.commons.lang.StringUtils.isBlank(pathString)) {
++ continue;
++ }
++ Path path = new Path(pathString);
+ FileSystem fs = path.getFileSystem(jobConf);
++ if (ignoreInvalidPath && !fs.exists(path)) {
++ pathIterator.remove();
++ continue;
++ }
+ final String qualifiedPath = fs.makeQualified(path).toString();
+ str.append(separator)
+ .append(StringUtils.escapeString(qualifiedPath));
+ separator = StringUtils.COMMA_STR;
+ }
+
+- jobConf.set("mapred.input.dir", str.toString());
++ if (!ignoreInvalidPath || !pathStrings.isEmpty()) {
++ jobConf.set("mapred.input.dir", str.toString());
++ }
++ return pathStrings;
+ }
+
+ }
+diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+index 2440cb5..4e23fa2 100644
+--- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
++++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+@@ -66,6 +66,7 @@
+ import org.apache.pig.data.Tuple;
+ import org.apache.pig.impl.logicalLayer.schema.Schema;
+ import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
++import org.apache.pig.impl.util.PropertiesUtil;
+ import org.joda.time.DateTime;
+ import org.junit.After;
+ import org.junit.Before;
+@@ -102,6 +103,7 @@
+ add("testReadPartitionedBasic");
+ add("testProjectionsBasic");
+ add("testColumnarStorePushdown2");
++ add("testReadMissingPartitionBasicNeg");
+ }});
+ }};
+
+@@ -438,6 +440,59 @@ public void testReadPartitionedBasic() throws IOException, CommandNeedRetryExcep
+ }
+
+ @Test
++ public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException {
++ assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
++ PigServer server = new PigServer(ExecType.LOCAL);
++
++ File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
++ if (!removeDirectory(removedPartitionDir)) {
++ System.out.println("Test did not run because its environment could not be set.");
++ return;
++ }
++ driver.run("select * from " + PARTITIONED_TABLE);
++ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
++ driver.getResults(valuesReadFromHiveDriver);
++ assertTrue(valuesReadFromHiveDriver.size() == 6);
++
++ server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++ Schema dumpedWSchema = server.dumpSchema("W");
++ List<FieldSchema> Wfields = dumpedWSchema.getFields();
++ assertEquals(3, Wfields.size());
++ assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
++ assertTrue(Wfields.get(0).type == DataType.INTEGER);
++ assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
++ assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
++ assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
++ assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
++
++ try {
++ Iterator<Tuple> WIter = server.openIterator("W");
++ fail("Should failed in retriving an invalid partition");
++ } catch (IOException ioe) {
++ // expected
++ }
++ }
++
++ private static boolean removeDirectory(File dir) {
++ boolean success = false;
++ if (dir.isDirectory()) {
++ File[] files = dir.listFiles();
++ if (files != null && files.length > 0) {
++ for (File file : files) {
++ success = removeDirectory(file);
++ if (!success) {
++ return false;
++ }
++ }
++ }
++ success = dir.delete();
++ } else {
++ success = dir.delete();
++ }
++ return success;
++ }
++
++ @Test
+ public void testProjectionsBasic() throws IOException {
+ assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
+
+diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java
+new file mode 100644
+index 0000000..41fe79b
+--- /dev/null
++++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java
+@@ -0,0 +1,305 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.hive.hcatalog.pig;
++
++import java.io.File;
++import java.io.FileWriter;
++import java.io.IOException;
++import java.io.PrintWriter;
++import java.io.RandomAccessFile;
++import java.sql.Date;
++import java.sql.Timestamp;
++import java.util.ArrayList;
++import java.util.Collection;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Map;
++import java.util.Properties;
++import java.util.Set;
++
++import org.apache.commons.io.FileUtils;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.FileUtil;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.hive.cli.CliSessionState;
++import org.apache.hadoop.hive.conf.HiveConf;
++import org.apache.hadoop.hive.ql.CommandNeedRetryException;
++import org.apache.hadoop.hive.ql.Driver;
++import org.apache.hadoop.hive.ql.WindowsPathUtil;
++import org.apache.hadoop.hive.ql.io.IOConstants;
++import org.apache.hadoop.hive.ql.io.StorageFormats;
++import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
++import org.apache.hadoop.hive.ql.session.SessionState;
++import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
++import org.apache.hadoop.mapreduce.Job;
++import org.apache.hadoop.util.Shell;
++import org.apache.hive.hcatalog.HcatTestUtils;
++import org.apache.hive.hcatalog.common.HCatUtil;
++import org.apache.hive.hcatalog.common.HCatConstants;
++import org.apache.hive.hcatalog.data.Pair;
++import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
++import org.apache.pig.ExecType;
++import org.apache.pig.PigRunner;
++import org.apache.pig.PigServer;
++import org.apache.pig.ResourceStatistics;
++import org.apache.pig.tools.pigstats.OutputStats;
++import org.apache.pig.tools.pigstats.PigStats;
++import org.apache.pig.data.DataType;
++import org.apache.pig.data.Tuple;
++import org.apache.pig.impl.logicalLayer.schema.Schema;
++import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
++import org.apache.pig.impl.util.PropertiesUtil;
++import org.joda.time.DateTime;
++import org.junit.After;
++import org.junit.Before;
++import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import static org.junit.Assert.*;
++import static org.junit.Assume.assumeTrue;
++
++@RunWith(Parameterized.class)
++public class TestHCatLoaderWithProps {
++ private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderWithProps.class);
++ private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") +
++ File.separator + TestHCatLoaderWithProps.class.getCanonicalName() + "-" + System.currentTimeMillis());
++ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
++ private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
++
++ private static final String BASIC_TABLE = "junit_unparted_basic";
++ private static final String PARTITIONED_TABLE = "junit_parted_basic";
++
++ private Driver driver;
++ private Map<Integer, Pair<Integer, String>> basicInputData;
++
++ private static final Map<String, Set<String>> DISABLED_STORAGE_FORMATS =
++ new HashMap<String, Set<String>>() {{
++ put(IOConstants.PARQUETFILE, new HashSet<String>() {{
++ add("testReadMissingPartitionBasic");
++ }});
++ }};
++
++ private final String storageFormat;
++
++ @Parameterized.Parameters
++ public static Collection<Object[]> generateParameters() {
++ return StorageFormats.names();
++ }
++
++ public TestHCatLoaderWithProps(String storageFormat) {
++ this.storageFormat = storageFormat;
++ }
++
++ private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
++ dropTable(tablename, driver);
++ }
++
++ static void dropTable(String tablename, Driver driver) throws IOException, CommandNeedRetryException {
++ driver.run("drop table if exists " + tablename);
++ }
++
++ private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
++ createTable(tablename, schema, partitionedBy, driver, storageFormat);
++ }
++
++ static void createTable(String tablename, String schema, String partitionedBy, Driver driver, String storageFormat)
++ throws IOException, CommandNeedRetryException {
++ String createTable;
++ createTable = "create table " + tablename + "(" + schema + ") ";
++ if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
++ createTable = createTable + "partitioned by (" + partitionedBy + ") ";
++ }
++ createTable = createTable + "stored as " +storageFormat;
++ executeStatementOnDriver(createTable, driver);
++ }
++
++ private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
++ createTable(tablename, schema, null);
++ }
++
++ /**
++ * Execute Hive CLI statement
++ * @param cmd arbitrary statement to execute
++ */
++ static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException {
++ LOG.debug("Executing: " + cmd);
++ CommandProcessorResponse cpr = driver.run(cmd);
++ if(cpr.getResponseCode() != 0) {
++ throw new IOException("Failed to execute \"" + cmd + "\". Driver returned " + cpr.getResponseCode() + " Error: " + cpr.getErrorMessage());
++ }
++ }
++
++ @Before
++ public void setup() throws Exception {
++ File f = new File(TEST_WAREHOUSE_DIR);
++ if (f.exists()) {
++ FileUtil.fullyDelete(f);
++ }
++ if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
++ throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
++ }
++
++ HiveConf hiveConf = new HiveConf(this.getClass());
++ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
++ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
++ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
++ hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
++ hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
++
++ if (Shell.WINDOWS) {
++ WindowsPathUtil.convertPathsFromWindowsToHdfs(hiveConf);
++ }
++
++ driver = new Driver(hiveConf);
++ SessionState.start(new CliSessionState(hiveConf));
++
++ createTable(BASIC_TABLE, "a int, b string");
++ createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");
++
++ int LOOP_SIZE = 3;
++ String[] input = new String[LOOP_SIZE * LOOP_SIZE];
++ basicInputData = new HashMap<Integer, Pair<Integer, String>>();
++ int k = 0;
++ for (int i = 1; i <= LOOP_SIZE; i++) {
++ String si = i + "";
++ for (int j = 1; j <= LOOP_SIZE; j++) {
++ String sj = "S" + j + "S";
++ input[k] = si + "\t" + sj;
++ basicInputData.put(k, new Pair<Integer, String>(i, sj));
++ k++;
++ }
++ }
++ HcatTestUtils.createTestDataFile(BASIC_FILE_NAME, input);
++
++ PigServer server = new PigServer(ExecType.LOCAL);
++ server.setBatchOn();
++ int i = 0;
++ server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);", ++i);
++
++ server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
++ server.registerQuery("B = foreach A generate a,b;", ++i);
++ server.registerQuery("B2 = filter B by a < 2;", ++i);
++ server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=0');", ++i);
++
++ server.registerQuery("C = foreach A generate a,b;", ++i);
++ server.registerQuery("C2 = filter C by a >= 2;", ++i);
++ server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=1');", ++i);
++
++ server.executeBatch();
++ }
++
++ @After
++ public void tearDown() throws Exception {
++ try {
++ if (driver != null) {
++ dropTable(BASIC_TABLE);
++ dropTable(PARTITIONED_TABLE);
++ }
++ } finally {
++ FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
++ }
++ }
++
++ @Test
++ public void testReadMissingPartitionBasic() throws IOException, CommandNeedRetryException {
++ assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
++ Properties pigProperties = PropertiesUtil.loadDefaultProperties();
++ pigProperties.setProperty("hcat.input.ignore.invalid.path", "true");
++ PigServer server = new PigServer(ExecType.LOCAL, pigProperties);
++
++ File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
++ if (!removeDirectory(removedPartitionDir)) {
++ System.out.println("Test did not run because its environment could not be set.");
++ return;
++ }
++ driver.run("select * from " + PARTITIONED_TABLE);
++ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
++ driver.getResults(valuesReadFromHiveDriver);
++ assertTrue(valuesReadFromHiveDriver.size() == 6);
++
++ server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++ Schema dumpedWSchema = server.dumpSchema("W");
++ List<FieldSchema> Wfields = dumpedWSchema.getFields();
++ assertEquals(3, Wfields.size());
++ assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
++ assertTrue(Wfields.get(0).type == DataType.INTEGER);
++ assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
++ assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
++ assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
++ assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
++
++ Iterator<Tuple> WIter = server.openIterator("W");
++ Collection<Pair<Integer, String>> valuesRead = new ArrayList<Pair<Integer, String>>();
++ while (WIter.hasNext()) {
++ Tuple t = WIter.next();
++ assertTrue(t.size() == 3);
++ assertNotNull(t.get(0));
++ assertNotNull(t.get(1));
++ assertNotNull(t.get(2));
++ assertTrue(t.get(0).getClass() == Integer.class);
++ assertTrue(t.get(1).getClass() == String.class);
++ assertTrue(t.get(2).getClass() == String.class);
++ valuesRead.add(new Pair<Integer, String>((Integer) t.get(0), (String) t.get(1)));
++ // the returned partition value is always 1
++ assertEquals("1", t.get(2));
++ }
++ assertEquals(valuesReadFromHiveDriver.size(), valuesRead.size());
++
++ server.registerQuery("P1 = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++ server.registerQuery("P1filter = filter P1 by bkt == '0';");
++ Iterator<Tuple> P1Iter = server.openIterator("P1filter");
++ assertFalse(P1Iter.hasNext());
++
++ server.registerQuery("P2 = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++ server.registerQuery("P2filter = filter P2 by bkt == '1';");
++ Iterator<Tuple> P2Iter = server.openIterator("P2filter");
++ int count2 = 0;
++ while (P2Iter.hasNext()) {
++ Tuple t = P2Iter.next();
++ assertEquals("1", t.get(2));
++ assertTrue(((Integer) t.get(0)) > 1);
++ count2++;
++ }
++ assertEquals(6, count2);
++ }
++
++ private static boolean removeDirectory(File dir) {
++ boolean success = false;
++ if (dir.isDirectory()) {
++ File[] files = dir.listFiles();
++ if (files != null && files.length > 0) {
++ for (File file : files) {
++ success = removeDirectory(file);
++ if (!success) {
++ return false;
++ }
++ }
++ }
++ success = dir.delete();
++ } else {
++ success = dir.delete();
++ }
++ return success;
++ }
++}
http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
index 6b03fcb..d165e7e 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
@@ -208,4 +208,7 @@ public final class HCatConstants {
*/
public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
+
+ public static final String HCAT_INPUT_IGNORE_INVALID_PATH_KEY = "hcat.input.ignore.invalid.path";
+ public static final boolean HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT = false;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
index adfaf4e..dbbdd61 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
@@ -21,11 +21,11 @@ package org.apache.hive.hcatalog.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -127,7 +127,10 @@ public abstract class HCatBaseInputFormat
//For each matching partition, call getSplits on the underlying InputFormat
for (PartInfo partitionInfo : partitionInfoList) {
jobConf = HCatUtil.getJobConfFromContext(jobContext);
- setInputPath(jobConf, partitionInfo.getLocation());
+ List<String> setInputPath = setInputPath(jobConf, partitionInfo.getLocation());
+ if (setInputPath.isEmpty()) {
+ continue;
+ }
Map<String, String> jobProperties = partitionInfo.getJobProperties();
HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
@@ -281,7 +284,7 @@ public abstract class HCatBaseInputFormat
return (InputJobInfo) HCatUtil.deserialize(jobString);
}
- private void setInputPath(JobConf jobConf, String location)
+ private List<String> setInputPath(JobConf jobConf, String location)
throws IOException {
// ideally we should just call FileInputFormat.setInputPaths() here - but
@@ -322,19 +325,33 @@ public abstract class HCatBaseInputFormat
}
pathStrings.add(location.substring(pathStart, length));
- Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
String separator = "";
StringBuilder str = new StringBuilder();
- for (Path path : paths) {
+ boolean ignoreInvalidPath =jobConf.getBoolean(HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_KEY,
+ HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT);
+ Iterator<String> pathIterator = pathStrings.iterator();
+ while (pathIterator.hasNext()) {
+ String pathString = pathIterator.next();
+ if (ignoreInvalidPath && org.apache.commons.lang.StringUtils.isBlank(pathString)) {
+ continue;
+ }
+ Path path = new Path(pathString);
FileSystem fs = path.getFileSystem(jobConf);
+ if (ignoreInvalidPath && !fs.exists(path)) {
+ pathIterator.remove();
+ continue;
+ }
final String qualifiedPath = fs.makeQualified(path).toString();
str.append(separator)
.append(StringUtils.escapeString(qualifiedPath));
separator = StringUtils.COMMA_STR;
}
- jobConf.set("mapred.input.dir", str.toString());
+ if (!ignoreInvalidPath || !pathStrings.isEmpty()) {
+ jobConf.set("mapred.input.dir", str.toString());
+ }
+ return pathStrings;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
----------------------------------------------------------------------
diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
index 2440cb5..4e23fa2 100644
--- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
@@ -66,6 +66,7 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.PropertiesUtil;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
@@ -102,6 +103,7 @@ public class TestHCatLoader {
add("testReadPartitionedBasic");
add("testProjectionsBasic");
add("testColumnarStorePushdown2");
+ add("testReadMissingPartitionBasicNeg");
}});
}};
@@ -438,6 +440,59 @@ public class TestHCatLoader {
}
@Test
+ public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException {
+ assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
+ PigServer server = new PigServer(ExecType.LOCAL);
+
+ File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
+ if (!removeDirectory(removedPartitionDir)) {
+ System.out.println("Test did not run because its environment could not be set.");
+ return;
+ }
+ driver.run("select * from " + PARTITIONED_TABLE);
+ ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ driver.getResults(valuesReadFromHiveDriver);
+ assertTrue(valuesReadFromHiveDriver.size() == 6);
+
+ server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
+ Schema dumpedWSchema = server.dumpSchema("W");
+ List<FieldSchema> Wfields = dumpedWSchema.getFields();
+ assertEquals(3, Wfields.size());
+ assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
+ assertTrue(Wfields.get(0).type == DataType.INTEGER);
+ assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
+ assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
+ assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
+ assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
+
+ try {
+ Iterator<Tuple> WIter = server.openIterator("W");
+ fail("Should failed in retriving an invalid partition");
+ } catch (IOException ioe) {
+ // expected
+ }
+ }
+
+ private static boolean removeDirectory(File dir) {
+ boolean success = false;
+ if (dir.isDirectory()) {
+ File[] files = dir.listFiles();
+ if (files != null && files.length > 0) {
+ for (File file : files) {
+ success = removeDirectory(file);
+ if (!success) {
+ return false;
+ }
+ }
+ }
+ success = dir.delete();
+ } else {
+ success = dir.delete();
+ }
+ return success;
+ }
+
+ @Test
public void testProjectionsBasic() throws IOException {
assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
[07/11] hive git commit: HIVE-13596 : HS2 should be able to get UDFs
on demand from metastore (Sergey Shelukin, reviewed by Jason Dere)
Posted by jd...@apache.org.
HIVE-13596 : HS2 should be able to get UDFs on demand from metastore (Sergey Shelukin, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6460529a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6460529a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6460529a
Branch: refs/heads/llap
Commit: 6460529aa910d7dd4af499d4adf85cbcf67452ce
Parents: 1289aff
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 29 11:01:34 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 29 11:01:34 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../apache/hadoop/hive/ql/exec/Registry.java | 323 ++++++++++++-------
.../hadoop/hive/ql/session/SessionState.java | 3 +-
3 files changed, 218 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index db4b9e8..46a3b96 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2322,6 +2322,9 @@ public class HiveConf extends Configuration {
HIVE_SERVER2_BUILTIN_UDF_BLACKLIST("hive.server2.builtin.udf.blacklist", "",
"Comma separated list of udfs names. These udfs will not be allowed in queries." +
" The udf black list takes precedence over udf white list"),
+ HIVE_ALLOW_UDF_LOAD_ON_DEMAND("hive.allow.udf.load.on.demand", false,
+ "Whether enable loading UDFs from metastore on demand; this is mostly relevant for\n" +
+ "HS2 and was the default behavior before Hive 1.2. Off by default."),
HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "6h",
new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false),
http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
index d5f4a37..3b54b49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
@@ -24,8 +24,12 @@ import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -46,7 +50,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
-import java.net.URLClassLoader;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -56,6 +59,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -80,14 +84,16 @@ public class Registry {
private final Set<ClassLoader> mSessionUDFLoaders = new LinkedHashSet<ClassLoader>();
private final boolean isNative;
+ /**
+ * The epic lock for the registry. This was added to replace the synchronized methods with
+ * minimum disruption; the locking should really be made more granular here.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
- Registry(boolean isNative) {
+ public Registry(boolean isNative) {
this.isNative = isNative;
}
- public Registry() {
- this(false);
- }
/**
* Registers the appropriate kind of temporary function based on a class's
@@ -157,11 +163,16 @@ public class Registry {
* Registers the UDF class as a built-in function; used for dynamically created UDFs, like
* GenericUDFOP*Minus/Plus.
*/
- public synchronized void registerHiddenBuiltIn(Class<? extends GenericUDF> functionClass) {
- if (!isNative) {
- throw new RuntimeException("Builtin is not for this registry");
+ public void registerHiddenBuiltIn(Class<? extends GenericUDF> functionClass) {
+ lock.lock();
+ try {
+ if (!isNative) {
+ throw new RuntimeException("Builtin is not for this registry");
+ }
+ builtIns.add(functionClass);
+ } finally {
+ lock.unlock();
}
- builtIns.add(functionClass);
}
public FunctionInfo registerGenericUDTF(String functionName,
@@ -256,23 +267,29 @@ public class Registry {
* @param functionName
* @return
*/
- public synchronized FunctionInfo getFunctionInfo(String functionName) throws SemanticException {
- functionName = functionName.toLowerCase();
- if (FunctionUtils.isQualifiedFunctionName(functionName)) {
- return getQualifiedFunctionInfo(functionName);
- }
- // First try without qualifiers - would resolve builtin/temp functions.
- // Otherwise try qualifying with current db name.
- FunctionInfo functionInfo = mFunctions.get(functionName);
- if (functionInfo != null && functionInfo.isBlockedFunction()) {
- throw new SemanticException ("UDF " + functionName + " is not allowed");
- }
- if (functionInfo == null) {
- String qualifiedName = FunctionUtils.qualifyFunctionName(
- functionName, SessionState.get().getCurrentDatabase().toLowerCase());
- functionInfo = getQualifiedFunctionInfo(qualifiedName);
- }
+ public FunctionInfo getFunctionInfo(String functionName) throws SemanticException {
+ lock.lock();
+ try {
+ functionName = functionName.toLowerCase();
+ if (FunctionUtils.isQualifiedFunctionName(functionName)) {
+ return getQualifiedFunctionInfoUnderLock(functionName);
+ }
+ // First try without qualifiers - would resolve builtin/temp functions.
+ // Otherwise try qualifying with current db name.
+ FunctionInfo functionInfo = mFunctions.get(functionName);
+ if (functionInfo != null && functionInfo.isBlockedFunction()) {
+ throw new SemanticException ("UDF " + functionName + " is not allowed");
+ }
+ if (functionInfo == null) {
+ String qualifiedName = FunctionUtils.qualifyFunctionName(
+ functionName, SessionState.get().getCurrentDatabase().toLowerCase());
+ functionInfo = getQualifiedFunctionInfoUnderLock(qualifiedName);
+ }
return functionInfo;
+ } finally {
+ lock.unlock();
+ }
+
}
public WindowFunctionInfo getWindowFunctionInfo(String functionName) throws SemanticException {
@@ -295,15 +312,23 @@ public class Registry {
return udfClass != null && persistent.containsKey(udfClass);
}
- public synchronized Set<String> getCurrentFunctionNames() {
- return getFunctionNames((Pattern)null);
+ public Set<String> getCurrentFunctionNames() {
+ lock.lock();
+ try {
+ return getFunctionNames((Pattern)null);
+ } finally {
+ lock.unlock();
+ }
}
- public synchronized Set<String> getFunctionNames(String funcPatternStr) {
+ public Set<String> getFunctionNames(String funcPatternStr) {
+ lock.lock();
try {
return getFunctionNames(Pattern.compile(funcPatternStr));
} catch (PatternSyntaxException e) {
return Collections.emptySet();
+ } finally {
+ lock.unlock();
}
}
@@ -315,17 +340,22 @@ public class Registry {
* @param funcPattern regular expression of the interested function names
* @return set of strings contains function names
*/
- public synchronized Set<String> getFunctionNames(Pattern funcPattern) {
- Set<String> funcNames = new TreeSet<String>();
- for (String funcName : mFunctions.keySet()) {
- if (funcName.contains(WINDOW_FUNC_PREFIX)) {
- continue;
- }
- if (funcPattern == null || funcPattern.matcher(funcName).matches()) {
- funcNames.add(funcName);
+ public Set<String> getFunctionNames(Pattern funcPattern) {
+ lock.lock();
+ try {
+ Set<String> funcNames = new TreeSet<String>();
+ for (String funcName : mFunctions.keySet()) {
+ if (funcName.contains(WINDOW_FUNC_PREFIX)) {
+ continue;
+ }
+ if (funcPattern == null || funcPattern.matcher(funcName).matches()) {
+ funcNames.add(funcName);
+ }
}
+ return funcNames;
+ } finally {
+ lock.unlock();
}
- return funcNames;
}
/**
@@ -334,18 +364,23 @@ public class Registry {
* @param funcInfo
* @param synonyms
*/
- public synchronized void getFunctionSynonyms(
+ public void getFunctionSynonyms(
String funcName, FunctionInfo funcInfo, Set<String> synonyms) throws SemanticException {
- Class<?> funcClass = funcInfo.getFunctionClass();
- for (Map.Entry<String, FunctionInfo> entry : mFunctions.entrySet()) {
- String name = entry.getKey();
- if (name.contains(WINDOW_FUNC_PREFIX) || name.equals(funcName)) {
- continue;
- }
- FunctionInfo function = entry.getValue();
- if (function.getFunctionClass() == funcClass) {
- synonyms.add(name);
+ lock.lock();
+ try {
+ Class<?> funcClass = funcInfo.getFunctionClass();
+ for (Map.Entry<String, FunctionInfo> entry : mFunctions.entrySet()) {
+ String name = entry.getKey();
+ if (name.contains(WINDOW_FUNC_PREFIX) || name.equals(funcName)) {
+ continue;
+ }
+ FunctionInfo function = entry.getValue();
+ if (function.getFunctionClass() == funcClass) {
+ synonyms.add(name);
+ }
}
+ } finally {
+ lock.unlock();
}
}
@@ -409,26 +444,31 @@ public class Registry {
return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
}
- private synchronized void addFunction(String functionName, FunctionInfo function) {
- if (isNative ^ function.isNative()) {
- throw new RuntimeException("Function " + functionName + " is not for this registry");
- }
- functionName = functionName.toLowerCase();
- FunctionInfo prev = mFunctions.get(functionName);
- if (prev != null) {
- if (isBuiltInFunc(prev.getFunctionClass())) {
- throw new RuntimeException("Function " + functionName + " is hive builtin function, " +
- "which cannot be overriden.");
+ private void addFunction(String functionName, FunctionInfo function) {
+ lock.lock();
+ try {
+ if (isNative != function.isNative()) {
+ throw new RuntimeException("Function " + functionName + " is not for this registry");
}
- prev.discarded();
- }
- mFunctions.put(functionName, function);
- if (function.isBuiltIn()) {
- builtIns.add(function.getFunctionClass());
- } else if (function.isPersistent()) {
- Class<?> functionClass = getPermanentUdfClass(function);
- Integer refCount = persistent.get(functionClass);
- persistent.put(functionClass, Integer.valueOf(refCount == null ? 1 : refCount + 1));
+ functionName = functionName.toLowerCase();
+ FunctionInfo prev = mFunctions.get(functionName);
+ if (prev != null) {
+ if (isBuiltInFunc(prev.getFunctionClass())) {
+ throw new RuntimeException("Function " + functionName + " is hive builtin function, " +
+ "which cannot be overriden.");
+ }
+ prev.discarded();
+ }
+ mFunctions.put(functionName, function);
+ if (function.isBuiltIn()) {
+ builtIns.add(function.getFunctionClass());
+ } else if (function.isPersistent()) {
+ Class<?> functionClass = getPermanentUdfClass(function);
+ Integer refCount = persistent.get(functionClass);
+ persistent.put(functionClass, Integer.valueOf(refCount == null ? 1 : refCount + 1));
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -446,23 +486,27 @@ public class Registry {
return functionClass;
}
- public synchronized void unregisterFunction(String functionName) throws HiveException {
- functionName = functionName.toLowerCase();
- FunctionInfo fi = mFunctions.get(functionName);
- if (fi != null) {
- if (fi.isBuiltIn()) {
- throw new HiveException(ErrorMsg.DROP_NATIVE_FUNCTION.getMsg(functionName));
- }
- mFunctions.remove(functionName);
- fi.discarded();
- if (fi.isPersistent()) {
- removePersistentFunction(fi);
+ public void unregisterFunction(String functionName) throws HiveException {
+ lock.lock();
+ try {
+ functionName = functionName.toLowerCase();
+ FunctionInfo fi = mFunctions.get(functionName);
+ if (fi != null) {
+ if (fi.isBuiltIn()) {
+ throw new HiveException(ErrorMsg.DROP_NATIVE_FUNCTION.getMsg(functionName));
+ }
+ mFunctions.remove(functionName);
+ fi.discarded();
+ if (fi.isPersistent()) {
+ removePersistentFunctionUnderLock(fi);
+ }
}
+ } finally {
+ lock.unlock();
}
}
- /** Should only be called from synchronized methods. */
- private void removePersistentFunction(FunctionInfo fi) {
+ private void removePersistentFunctionUnderLock(FunctionInfo fi) {
Class<?> functionClass = getPermanentUdfClass(fi);
Integer refCount = persistent.get(functionClass);
assert refCount != null;
@@ -478,10 +522,15 @@ public class Registry {
* @param dbName database name
* @throws HiveException
*/
- public synchronized void unregisterFunctions(String dbName) throws HiveException {
- Set<String> funcNames = getFunctionNames(dbName.toLowerCase() + "\\..*");
- for (String funcName : funcNames) {
- unregisterFunction(funcName);
+ public void unregisterFunctions(String dbName) throws HiveException {
+ lock.lock();
+ try {
+ Set<String> funcNames = getFunctionNames(dbName.toLowerCase() + "\\..*");
+ for (String funcName : funcNames) {
+ unregisterFunction(funcName);
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -493,7 +542,7 @@ public class Registry {
return null;
}
- private FunctionInfo getQualifiedFunctionInfo(String qualifiedName) throws SemanticException {
+ private FunctionInfo getQualifiedFunctionInfoUnderLock(String qualifiedName) throws SemanticException {
FunctionInfo info = mFunctions.get(qualifiedName);
if (info != null && info.isBlockedFunction()) {
throw new SemanticException ("UDF " + qualifiedName + " is not allowed");
@@ -509,7 +558,24 @@ public class Registry {
if (isNative && info != null && info.isPersistent()) {
return registerToSessionRegistry(qualifiedName, info);
}
- return info;
+ if (info != null || !isNative) {
+ return info; // We have the UDF, or we are in the session registry (or both).
+ }
+ // If we are in the system registry and this feature is enabled, try to get it from metastore.
+ SessionState ss = SessionState.get();
+ HiveConf conf = (ss == null) ? null : ss.getConf();
+ if (conf == null || !HiveConf.getBoolVar(conf, ConfVars.HIVE_ALLOW_UDF_LOAD_ON_DEMAND)) {
+ return null;
+ }
+ // This is a little bit weird. We'll do the MS call outside of the lock. Our caller calls us
+ // under lock, so we'd preserve the lock state for them; their finally block will release the
+ // lock correctly. See the comment on the lock field - the locking needs to be reworked.
+ lock.unlock();
+ try {
+ return getFunctionInfoFromMetastoreNoLock(qualifiedName, conf);
+ } finally {
+ lock.lock();
+ }
}
// should be called after session registry is checked
@@ -545,40 +611,52 @@ public class Registry {
return ret;
}
- private void checkFunctionClass(FunctionInfo cfi) throws ClassNotFoundException {
- // This call will fail for non-generic UDFs using GenericUDFBridge
- Class<?> udfClass = cfi.getFunctionClass();
- // Even if we have a reference to the class (which will be the case for GenericUDFs),
- // the classloader may not be able to resolve the class, which would mean reflection-based
- // methods would fail such as for plan deserialization. Make sure this works too.
- Class.forName(udfClass.getName(), true, Utilities.getSessionSpecifiedClassLoader());
- }
-
- public synchronized void clear() {
- if (isNative) {
- throw new IllegalStateException("System function registry cannot be cleared");
+ public void clear() {
+ lock.lock();
+ try {
+ if (isNative) {
+ throw new IllegalStateException("System function registry cannot be cleared");
+ }
+ mFunctions.clear();
+ builtIns.clear();
+ persistent.clear();
+ } finally {
+ lock.unlock();
}
- mFunctions.clear();
- builtIns.clear();
- persistent.clear();
}
- public synchronized void closeCUDFLoaders() {
+ public void closeCUDFLoaders() {
+ lock.lock();
try {
- for(ClassLoader loader: mSessionUDFLoaders) {
- JavaUtils.closeClassLoader(loader);
+ try {
+ for(ClassLoader loader: mSessionUDFLoaders) {
+ JavaUtils.closeClassLoader(loader);
+ }
+ } catch (IOException ie) {
+ LOG.error("Error in close loader: " + ie);
}
- } catch (IOException ie) {
- LOG.error("Error in close loader: " + ie);
+ mSessionUDFLoaders.clear();
+ } finally {
+ lock.unlock();
}
- mSessionUDFLoaders.clear();
}
- public synchronized void addToUDFLoaders(ClassLoader loader) {
- mSessionUDFLoaders.add(loader);
+ public void addToUDFLoaders(ClassLoader loader) {
+ lock.lock();
+ try {
+ mSessionUDFLoaders.add(loader);
+ } finally {
+ lock.unlock();
+ }
}
- public synchronized void removeFromUDFLoaders(ClassLoader loader) {
- mSessionUDFLoaders.remove(loader);
+
+ public void removeFromUDFLoaders(ClassLoader loader) {
+ lock.lock();
+ try {
+ mSessionUDFLoaders.remove(loader);
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -611,4 +689,29 @@ public class Registry {
return blackList.contains(functionName) ||
(!whiteList.isEmpty() && !whiteList.contains(functionName));
}
+
+ /**
+ * This is called outside of the lock. Some of the methods that are called transitively by
+ * this (e.g. addFunction) will take the lock again and then release it, which is ok.
+ */
+ private FunctionInfo getFunctionInfoFromMetastoreNoLock(String functionName, HiveConf conf) {
+ try {
+ String[] parts = FunctionUtils.getQualifiedFunctionNameParts(functionName);
+ Function func = Hive.get(conf).getFunction(parts[0].toLowerCase(), parts[1]);
+ if (func == null) {
+ return null;
+ }
+ // Found UDF in metastore - now add it to the function registry.
+ FunctionInfo fi = registerPermanentFunction(functionName, func.getClassName(), true,
+ FunctionTask.toFunctionResource(func.getResourceUris()));
+ if (fi == null) {
+ LOG.error(func.getClassName() + " is not a valid UDF class and was not registered");
+ return null;
+ }
+ return fi;
+ } catch (Throwable e) {
+ LOG.info("Unable to look up " + functionName + " in metastore", e);
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 672df63..d211eb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -259,7 +259,7 @@ public class SessionState {
*/
private final Set<String> preReloadableAuxJars = new HashSet<String>();
- private final Registry registry = new Registry();
+ private final Registry registry;
/**
* CURRENT_TIMESTAMP value for query
@@ -352,6 +352,7 @@ public class SessionState {
public SessionState(HiveConf conf, String userName) {
this.sessionConf = conf;
this.userName = userName;
+ this.registry = new Registry(false);
if (LOG.isDebugEnabled()) {
LOG.debug("SessionState user: " + userName);
}
[04/11] hive git commit: HIVE-13469. LLAP: Support delayed scheduling
for locality. (Siddharth Seth, reviewed by Prasanth Jayachandran,
Sergey Shelukhin)
Posted by jd...@apache.org.
HIVE-13469. LLAP: Support delayed scheduling for locality. (Siddharth Seth, reviewed by Prasanth Jayachandran, Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/347a5a55
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/347a5a55
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/347a5a55
Branch: refs/heads/llap
Commit: 347a5a5580742a36a875bd6a5f2ac8acd74d3cbf
Parents: 076f365
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Apr 29 15:14:27 2016 +0530
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Apr 29 15:14:27 2016 +0530
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hive/llap/tezplugins/ContainerFactory.java | 3 +-
.../tezplugins/LlapTaskSchedulerService.java | 377 ++++++++--
.../llap/tezplugins/helpers/MonotonicClock.java | 24 +
.../scheduler/LoggingFutureCallback.java | 44 ++
.../TestLlapTaskSchedulerService.java | 734 ++++++++++++++++++-
6 files changed, 1068 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 566e9b6..fd725cb 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2762,7 +2762,7 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS, -1l, true, Long.MAX_VALUE, true),
"Amount of time to wait before allocating a request which contains location information," +
" to a location other than the ones requested. Set to -1 for an infinite delay, 0" +
- "for a no delay. Currently these are the only two supported values"
+ "for no delay."
),
LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS(
"hive.llap.daemon.task.preemption.metrics.intervals", "30,60,300",
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
index a314391..f1feec7 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
@@ -37,11 +37,10 @@ class ContainerFactory {
}
public Container createContainer(Resource capability, Priority priority, String hostname,
- int port) {
+ int port, String nodeHttpAddress) {
ContainerId containerId =
ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement());
NodeId nodeId = NodeId.newInstance(hostname, port);
- String nodeHttpAddress = "hostname:0"; // TODO: include UI ports
Container container =
Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null);
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index c3d3a1d..da1e17f 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -50,7 +50,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
@@ -62,6 +61,8 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
+import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.yarn.api.records.Container;
@@ -77,7 +78,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
@@ -91,6 +91,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class);
+ private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator();
+
private final Configuration conf;
// interface into the registry service
@@ -104,6 +106,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// Tracks tasks which could not be allocated immediately.
@VisibleForTesting
+ // Tasks are tracked in the order requests come in, at different priority levels.
+ // TODO HIVE-13538 For tasks at the same priority level, it may be worth attempting to schedule tasks with
+ // locality information before those without locality information
final TreeMap<Priority, List<TaskInfo>> pendingTasks = new TreeMap<>(new Comparator<Priority>() {
@Override
public int compare(Priority o1, Priority o2) {
@@ -113,23 +118,30 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// Tracks running and queued tasks. Cleared after a task completes.
private final ConcurrentMap<Object, TaskInfo> knownTasks = new ConcurrentHashMap<>();
+ // Tracks tasks which are running. Useful for selecting a task to preempt based on when it started.
private final TreeMap<Integer, TreeSet<TaskInfo>> runningTasks = new TreeMap<>();
- private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator();
+
// Queue for disabled nodes. Nodes make it out of this queue when their expiration timeout is hit.
@VisibleForTesting
final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue<>();
+ @VisibleForTesting
+ final DelayQueue<TaskInfo> delayedTaskQueue = new DelayQueue<>();
- private final boolean forceLocation;
private final ContainerFactory containerFactory;
private final Random random = new Random();
- private final Clock clock;
+ @VisibleForTesting
+ final Clock clock;
private final ListeningExecutorService nodeEnabledExecutor;
private final NodeEnablerCallable nodeEnablerCallable =
new NodeEnablerCallable();
+ private final ListeningExecutorService delayedTaskSchedulerExecutor;
+ @VisibleForTesting
+ final DelayedTaskSchedulerCallable delayedTaskSchedulerCallable;
+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
@@ -147,6 +159,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private final Map<String, MutableInt> pendingPreemptionsPerHost = new HashMap<>();
private final NodeBlacklistConf nodeBlacklistConf;
+ private final LocalityDelayConf localityDelayConf;
// Per daemon
private final int memoryPerInstance;
@@ -168,6 +181,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private final LlapRegistryService registry = new LlapRegistryService(false);
private volatile ListenableFuture<Void> nodeEnablerFuture;
+ private volatile ListenableFuture<Void> delayedTaskSchedulerFuture;
private volatile ListenableFuture<Void> schedulerFuture;
@VisibleForTesting
@@ -181,7 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private final JvmPauseMonitor pauseMonitor;
public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
- this(taskSchedulerContext, new SystemClock(), true);
+ this(taskSchedulerContext, new MonotonicClock(), true);
}
@VisibleForTesting
@@ -189,6 +203,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
boolean initMetrics) {
super(taskSchedulerContext);
this.clock = clock;
+ this.delayedTaskSchedulerCallable = createDelayedTaskSchedulerCallable();
try {
this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
} catch (IOException e) {
@@ -197,6 +212,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
taskSchedulerContext.getCustomClusterIdentifier());
+ // TODO HIVE-13483 Get all of these properties from the registry. This will need to take care of different instances
+ // publishing potentially different values when we support changing configurations dynamically.
+ // For now, this can simply be fetched from a single registry instance.
this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE);
this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
@@ -212,11 +230,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
long localityDelayMs = HiveConf
.getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, TimeUnit.MILLISECONDS);
- if (localityDelayMs == -1) {
- this.forceLocation = true;
- } else {
- this.forceLocation = false;
- }
+
+ this.localityDelayConf = new LocalityDelayConf(localityDelayMs);
this.timeoutMonitor = new SchedulerTimeoutMonitor();
this.timeout = HiveConf.getTimeVar(conf,
@@ -240,6 +255,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build());
nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw);
+ ExecutorService delayedTaskSchedulerExecutorRaw = Executors.newFixedThreadPool(1,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerDelayedTaskHandler")
+ .build());
+ delayedTaskSchedulerExecutor =
+ MoreExecutors.listeningDecorator(delayedTaskSchedulerExecutorRaw);
+
ExecutorService schedulerExecutorServiceRaw = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw);
@@ -266,7 +287,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
+ ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
+ executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor
+ ", nodeBlacklistConf=" + nodeBlacklistConf
- + ", forceLocation=" + forceLocation);
+ + ", localityDelayMs=" + localityDelayMs);
}
@Override
@@ -279,29 +300,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
writeLock.lock();
try {
nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
- Futures.addCallback(nodeEnablerFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- LOG.info("NodeEnabledThread exited");
- }
+ Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG));
+
+ delayedTaskSchedulerFuture =
+ delayedTaskSchedulerExecutor.submit(delayedTaskSchedulerCallable);
+ Futures.addCallback(delayedTaskSchedulerFuture,
+ new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG));
- @Override
- public void onFailure(Throwable t) {
- LOG.warn("NodeEnabledThread exited with error", t);
- }
- });
schedulerFuture = schedulerExecutor.submit(schedulerCallable);
- Futures.addCallback(schedulerFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void result) {
- LOG.info("SchedulerThread exited");
- }
+ Futures.addCallback(schedulerFuture, new LoggingFutureCallback("SchedulerThread", LOG));
- @Override
- public void onFailure(Throwable t) {
- LOG.warn("SchedulerThread exited with error", t);
- }
- });
registry.start();
registry.registerStateChangeListener(new NodeStateChangeListener());
activeInstances = registry.getInstances();
@@ -399,6 +407,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
timeoutExecutor.shutdownNow();
+ delayedTaskSchedulerCallable.shutdown();
+ if (delayedTaskSchedulerFuture != null) {
+ delayedTaskSchedulerFuture.cancel(true);
+ }
+ delayedTaskSchedulerExecutor.shutdownNow();
+
schedulerCallable.shutdown();
if (schedulerFuture != null) {
schedulerFuture.cancel(true);
@@ -502,6 +516,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
@Override
public void blacklistNode(NodeId nodeId) {
LOG.info("BlacklistNode not supported");
+ // TODO HIVE-13484 What happens when we try scheduling a task on a node that Tez at this point thinks is blacklisted.
}
@Override
@@ -513,7 +528,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
Priority priority, Object containerSignature, Object clientCookie) {
TaskInfo taskInfo =
- new TaskInfo(task, clientCookie, priority, capability, hosts, racks, clock.getTime());
+ new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, hosts, racks, clock.getTime());
writeLock.lock();
try {
dagStats.registerTaskRequest(hosts, racks);
@@ -530,7 +545,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// Container affinity can be implemented as Host affinity for LLAP. Not required until
// 1:1 edges are used in Hive.
TaskInfo taskInfo =
- new TaskInfo(task, clientCookie, priority, capability, null, null, clock.getTime());
+ new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, null, null, clock.getTime());
writeLock.lock();
try {
dagStats.registerTaskRequest(null, null);
@@ -558,7 +573,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return false;
}
if (taskInfo.containerId == null) {
- if (taskInfo.assigned) {
+ if (taskInfo.getState() == TaskInfo.State.ASSIGNED) {
LOG.error("Task: "
+ task
+ " assigned, but could not find the corresponding containerId."
@@ -577,7 +592,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
assert nodeInfo != null;
// Re-enable the node if preempted
- if (taskInfo.preempted) {
+ if (taskInfo.getState() == TaskInfo.State.PREEMPTED) {
LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason);
unregisterPendingPreemption(taskInfo.assignedInstance.getHost());
nodeInfo.registerUnsuccessfulTaskEnd(true);
@@ -607,7 +622,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// In case of success, trigger a scheduling run for pending tasks.
trySchedulingPendingTasks();
- } else if (!taskSucceeded) {
+ } else { // Task Failed
nodeInfo.registerUnsuccessfulTaskEnd(false);
if (endReason != null && EnumSet
.of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR)
@@ -665,17 +680,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return true;
}
- private ExecutorService createAppCallbackExecutorService() {
- return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
- }
-
/**
* @param request the list of preferred hosts. null implies any host
* @return
*/
private SelectHostResult selectHost(TaskInfo request) {
String[] requestedHosts = request.requestedHosts;
+ long schedulerAttemptTime = clock.getTime();
readLock.lock(); // Read-lock. Not updating any stats at the moment.
try {
// If there's no memory available, fail
@@ -683,32 +694,61 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY;
}
+ boolean shouldDelayForLocality = request.shouldDelayForLocality(schedulerAttemptTime);
if (requestedHosts != null && requestedHosts.length > 0) {
int prefHostCount = -1;
- boolean requestedHostExists = false;
+ boolean requestedHostsWillBecomeAvailable = false;
for (String host : requestedHosts) {
prefHostCount++;
// Pick the first host always. Weak attempt at cache affinity.
Set<ServiceInstance> instances = activeInstances.getByHost(host);
if (!instances.isEmpty()) {
- requestedHostExists = true;
for (ServiceInstance inst : instances) {
NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
- if (nodeInfo != null && nodeInfo.canAcceptTask()) {
- LOG.info("Assigning " + inst + " when looking for " + host + "." +
- " FirstRequestedHost=" + (prefHostCount == 0) +
- (requestedHosts.length > 1 ? "#prefLocations=" + requestedHosts.length : ""));
- return new SelectHostResult(inst, nodeInfo);
+ if (nodeInfo != null) {
+ if (nodeInfo.canAcceptTask()) {
+ // Successfully scheduled.
+ LOG.info(
+ "Assigning " + nodeToString(inst, nodeInfo) + " when looking for " + host +
+ ". local=true" + " FirstRequestedHost=" + (prefHostCount == 0) +
+ (requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length :
+ ""));
+ return new SelectHostResult(inst, nodeInfo);
+ } else {
+ // The node cannot accept a task at the moment.
+ if (shouldDelayForLocality) {
+ // Perform some checks on whether the node will become available or not.
+ if (request.shouldForceLocality()) {
+ requestedHostsWillBecomeAvailable = true;
+ } else {
+ if (nodeInfo.getEnableTime() > request.getLocalityDelayTimeout() &&
+ nodeInfo.isDisabled() && nodeInfo.hadCommFailure()) {
+ // This node will likely be activated after the task timeout expires.
+ } else {
+ // Worth waiting for the timeout.
+ requestedHostsWillBecomeAvailable = true;
+ }
+ }
+ }
+ }
+ } else {
+ LOG.warn(
+ "Null NodeInfo when attempting to get host with worker identity {}, and host {}",
+ inst.getWorkerIdentity(), host);
+ // Leave requestedHostWillBecomeAvailable as is. If some other host is found - delay,
+ // else ends up allocating to a random host immediately.
}
}
}
}
// Check if forcing the location is required.
- if (forceLocation) {
- if (requestedHostExists) {
+ if (shouldDelayForLocality) {
+ if (requestedHostsWillBecomeAvailable) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping non-local location allocation for [" + request.task +
- "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]");
+ "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]" +
+ ". ScheduleAttemptTime=" + schedulerAttemptTime + ", taskDelayTimeout=" +
+ request.getLocalityDelayTimeout());
}
return SELECT_HOST_RESULT_DELAYED_LOCALITY;
} else {
@@ -729,10 +769,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
for (int i = 0; i < all.length; i++) {
Entry<String, NodeInfo> inst = all[(i + n) % all.length];
if (inst.getValue().canAcceptTask()) {
- LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length +
- ", requestedHosts=" +
- ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
- Arrays.toString(requestedHosts)));
+ LOG.info(
+ "Assigning " + nodeToString(inst.getValue().getServiceInstance(), inst.getValue()) +
+ " when looking for any host, from #hosts=" + all.length + ", requestedHosts=" +
+ ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
+ Arrays.toString(requestedHosts)));
return new SelectHostResult(inst.getValue().getServiceInstance(), inst.getValue());
}
}
@@ -820,6 +861,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
tasksAtPriority = new LinkedList<>();
pendingTasks.put(taskInfo.priority, tasksAtPriority);
}
+ // Delayed tasks will not kick in right now. That will happen in the scheduling loop.
tasksAtPriority.add(taskInfo);
knownTasks.putIfAbsent(taskInfo.task, taskInfo);
if (metrics != null) {
@@ -870,7 +912,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
try {
TaskInfo taskInfo = knownTasks.remove(task);
if (taskInfo != null) {
- if (taskInfo.assigned) {
+ if (taskInfo.getState() == TaskInfo.State.ASSIGNED) {
// Remove from the running list.
int priority = taskInfo.priority.getPriority();
Set<TaskInfo> tasksAtPriority = runningTasks.get(priority);
@@ -925,6 +967,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
taskInfo.triedAssigningTask();
ScheduleResult scheduleResult = scheduleTask(taskInfo);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult);
+ }
if (scheduleResult == ScheduleResult.SCHEDULED) {
taskIter.remove();
} else {
@@ -938,6 +983,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// Preempt only if there's no pending preemptions to avoid preempting twice for a task.
String[] potentialHosts;
if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) {
+
+ // Add the task to the delayed task queue if it does not already exist.
+ maybeAddToDelayedTaskQueue(taskInfo);
+
+ // Try preempting a lower priority task in any case.
// preempt only on specific hosts, if no preemptions already exist on those.
potentialHosts = taskInfo.requestedHosts;
//Protect against a bad location being requested.
@@ -1008,7 +1058,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
Container container =
containerFactory.createContainer(resourcePerExecutor, taskInfo.priority,
nsPair.getServiceInstance().getHost(),
- nsPair.getServiceInstance().getRpcPort());
+ nsPair.getServiceInstance().getRpcPort(),
+ nsPair.getServiceInstance().getServicesAddress());
writeLock.lock(); // While updating local structures
try {
LOG.info("Assigned task {} to container {}", taskInfo, container.getId());
@@ -1125,9 +1176,81 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
+ private void maybeAddToDelayedTaskQueue(TaskInfo taskInfo) {
+ // There's no point adding a task with forceLocality set - since that will never exit the queue.
+ // Add other tasks if they are not already in the queue.
+ if (!taskInfo.shouldForceLocality() && !taskInfo.isInDelayedQueue()) {
+ taskInfo.setInDelayedQueue(true);
+ delayedTaskQueue.add(taskInfo);
+ }
+ }
+
+ private String nodeToString(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
+ return serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", workerIdentity=" +
+ serviceInstance.getWorkerIdentity() + ", webAddress=" +
+ serviceInstance.getServicesAddress() + ", currentlyScheduledTasksOnNode=" + nodeInfo.numScheduledTasks;
+ }
+
+
+
+ // ------ Inner classes defined after this point ------
+
+ @VisibleForTesting
+ class DelayedTaskSchedulerCallable implements Callable<Void> {
+
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+ @Override
+ public Void call() {
+ while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ TaskInfo taskInfo = getNextTask();
+ taskInfo.setInDelayedQueue(false);
+ // Tasks can exist in the delayed queue even after they have been scheduled.
+ // Trigger scheduling only if the task is still in PENDING state.
+ processEvictedTask(taskInfo);
+
+ } catch (InterruptedException e) {
+ if (isShutdown.get()) {
+ LOG.info("DelayedTaskScheduler thread interrupted after shutdown");
+ break;
+ } else {
+ LOG.warn("DelayedTaskScheduler thread interrupted before being shutdown");
+ throw new RuntimeException(
+ "DelayedTaskScheduler thread interrupted without being shutdown", e);
+ }
+ }
+ }
+ return null;
+ }
+
+ public void shutdown() {
+ isShutdown.set(true);
+ }
+
+ public TaskInfo getNextTask() throws InterruptedException {
+ return delayedTaskQueue.take();
+ }
+
+ public void processEvictedTask(TaskInfo taskInfo) {
+ if (shouldScheduleTask(taskInfo)) {
+ trySchedulingPendingTasks();
+ }
+ }
+
+ public boolean shouldScheduleTask(TaskInfo taskInfo) {
+ return taskInfo.getState() == TaskInfo.State.PENDING;
+ }
+ }
+
+ @VisibleForTesting
+ DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() {
+ return new DelayedTaskSchedulerCallable();
+ }
+
private class NodeEnablerCallable implements Callable<Void> {
- private AtomicBoolean isShutdown = new AtomicBoolean(false);
+ private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private static final long REFRESH_INTERVAL = 10000l;
long nextPollInterval = REFRESH_INTERVAL;
long lastRefreshTime;
@@ -1135,13 +1258,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
@Override
public Void call() {
- lastRefreshTime = System.currentTimeMillis();
+ lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
try {
while (true) {
NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, TimeUnit.MILLISECONDS);
if (nodeInfo != null) {
- long currentTime = System.currentTimeMillis();
+ long currentTime = LlapTaskSchedulerService.this.clock.getTime();
// A node became available. Enable the node and try scheduling.
reenableDisabledNode(nodeInfo);
trySchedulingPendingTasks();
@@ -1152,7 +1275,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (nextPollInterval < 0 || nodeInfo == null) {
// timeout expired. Reset the poll interval and refresh nodes.
nextPollInterval = REFRESH_INTERVAL;
- lastRefreshTime = System.currentTimeMillis();
+ lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
// TODO Get rid of this polling once we have notificaitons from the registry sub-system
if (LOG.isDebugEnabled()) {
LOG.debug("Refreshing instances based on poll interval");
@@ -1232,6 +1355,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// will be handled in the next run.
// A new request may come in right after this is set to false, but before the actual scheduling.
// This will be handled in this run, but will cause an immediate run after, which is harmless.
+ // This is mainly to handle a trySchedue request while in the middle of a run - since the event
+ // which triggered it may not be processed for all tasks in the run.
pendingScheduleInvocations.set(false);
// Schedule outside of the scheduleLock - which should only be used to wait on the condition.
schedulePendingTasks();
@@ -1245,6 +1370,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
+ // ------ Additional static classes defined after this point ------
+
@VisibleForTesting
static class NodeInfo implements Delayed {
private final NodeBlacklistConf blacklistConf;
@@ -1257,6 +1384,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
float cumulativeBackoffFactor = 1.0f;
// Indicates whether a node had a recent communication failure.
+ // This is primarily for tracking and logging purposes for the moment.
+ // TODO At some point, treat task rejection and communication failures differently.
private boolean hadCommFailure = false;
// Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc.
@@ -1375,6 +1504,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
+ /**
+ * @return the time at which this node will be re-enabled
+ */
+ public long getEnableTime() {
+ return expireTimeMillis;
+ }
+
public boolean isDisabled() {
return disabled;
}
@@ -1382,13 +1518,20 @@ public class LlapTaskSchedulerService extends TaskScheduler {
public boolean hadCommFailure() {
return hadCommFailure;
}
+
/* Returning true does not guarantee that the task will run, considering other queries
may be running in the system. Also depends upon the capacity usage configuration
*/
public boolean canAcceptTask() {
boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive()
&&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
- LOG.info("canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}", result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled, serviceInstance.isAlive());
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
+ serviceInstance.getWorkerIdentity() + "]: " +
+ "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}",
+ result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled,
+ serviceInstance.isAlive());
+ }
return result;
}
@@ -1512,11 +1655,23 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
- private static class TaskInfo {
+
+ // TODO There needs to be a mechanism to figure out different attempts for the same task. Delays
+ // could potentially be changed based on this.
+
+ @VisibleForTesting
+ static class TaskInfo implements Delayed {
+
+ enum State {
+ PENDING, ASSIGNED, PREEMPTED
+ }
+
// IDs used to ensure two TaskInfos are different without using the underlying task instance.
// Required for insertion into a TreeMap
static final AtomicLong ID_GEN = new AtomicLong(0);
final long uniqueId;
+ final LocalityDelayConf localityDelayConf;
+ final Clock clock;
final Object task;
final Object clientCookie;
final Priority priority;
@@ -1524,19 +1679,22 @@ public class LlapTaskSchedulerService extends TaskScheduler {
final String[] requestedHosts;
final String[] requestedRacks;
final long requestTime;
+ final long localityDelayTimeout;
long startTime;
long preemptTime;
ContainerId containerId;
ServiceInstance assignedInstance;
- private boolean assigned = false;
- private boolean preempted = false;
+ private State state = State.PENDING;
+ boolean inDelayedQueue = false;
private int numAssignAttempts = 0;
// TaskInfo instances for two different tasks will not be the same. Only a single instance should
// ever be created for a taskAttempt
- public TaskInfo(Object task, Object clientCookie, Priority priority, Resource capability,
+ public TaskInfo(LocalityDelayConf localityDelayConf, Clock clock, Object task, Object clientCookie, Priority priority, Resource capability,
String[] hosts, String[] racks, long requestTime) {
+ this.localityDelayConf = localityDelayConf;
+ this.clock = clock;
this.task = task;
this.clientCookie = clientCookie;
this.priority = priority;
@@ -1544,30 +1702,61 @@ public class LlapTaskSchedulerService extends TaskScheduler {
this.requestedHosts = hosts;
this.requestedRacks = racks;
this.requestTime = requestTime;
+ if (localityDelayConf.getNodeLocalityDelay() == -1) {
+ localityDelayTimeout = Long.MAX_VALUE;
+ } else if (localityDelayConf.getNodeLocalityDelay() == 0) {
+ localityDelayTimeout = 0L;
+ } else {
+ localityDelayTimeout = requestTime + localityDelayConf.getNodeLocalityDelay();
+ }
this.uniqueId = ID_GEN.getAndIncrement();
}
- void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) {
+ synchronized void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) {
this.assignedInstance = instance;
- this.containerId = containerId;
+ this.containerId = containerId;
this.startTime = startTime;
- assigned = true;
+ this.state = State.ASSIGNED;
}
- void setPreemptedInfo(long preemptTime) {
- this.preempted = true;
- this.assigned = false;
+ synchronized void setPreemptedInfo(long preemptTime) {
+ this.state = State.PREEMPTED;
this.preemptTime = preemptTime;
}
- void triedAssigningTask() {
+ synchronized void setInDelayedQueue(boolean val) {
+ this.inDelayedQueue = val;
+ }
+
+ synchronized void triedAssigningTask() {
numAssignAttempts++;
}
- int getNumPreviousAssignAttempts() {
+ synchronized int getNumPreviousAssignAttempts() {
return numAssignAttempts;
}
+ synchronized State getState() {
+ return state;
+ }
+
+ synchronized boolean isInDelayedQueue() {
+ return inDelayedQueue;
+ }
+
+ boolean shouldDelayForLocality(long schedulerAttemptTime) {
+ // getDelay <=0 means the task will be evicted from the queue.
+ return localityDelayTimeout > schedulerAttemptTime;
+ }
+
+ boolean shouldForceLocality() {
+ return localityDelayTimeout == Long.MAX_VALUE;
+ }
+
+ long getLocalityDelayTimeout() {
+ return localityDelayTimeout;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -1602,8 +1791,26 @@ public class LlapTaskSchedulerService extends TaskScheduler {
", containerId=" + containerId +
", assignedInstance=" + assignedInstance +
", uniqueId=" + uniqueId +
+ ", localityDelayTimeout=" + localityDelayTimeout +
'}';
}
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(localityDelayTimeout - clock.getTime(), TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ TaskInfo other = (TaskInfo) o;
+ if (other.localityDelayTimeout > this.localityDelayTimeout) {
+ return -1;
+ } else if (other.localityDelayTimeout < this.localityDelayTimeout) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
}
// Newer tasks first.
@@ -1689,4 +1896,24 @@ public class LlapTaskSchedulerService extends TaskScheduler {
'}';
}
}
+
+ @VisibleForTesting
+ static final class LocalityDelayConf {
+ private final long nodeLocalityDelay;
+
+ public LocalityDelayConf(long nodeLocalityDelay) {
+ this.nodeLocalityDelay = nodeLocalityDelay;
+ }
+
+ public long getNodeLocalityDelay() {
+ return nodeLocalityDelay;
+ }
+
+ @Override
+ public String toString() {
+ return "LocalityDelayConf{" +
+ "nodeLocalityDelay=" + nodeLocalityDelay +
+ '}';
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
new file mode 100644
index 0000000..aaa6f91
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.tezplugins.helpers;
+
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.util.Clock;
+
+public class MonotonicClock implements Clock {
+ @Override
+ public long getTime() {
+ return Time.monotonicNow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
new file mode 100644
index 0000000..ea700da
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.scheduler;
+
+import java.util.concurrent.CancellationException;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.slf4j.Logger;
+
+public final class LoggingFutureCallback implements FutureCallback<Void> {
+ private final String componentName;
+ private final Logger LOG;
+
+ public LoggingFutureCallback(String componentName, Logger log) {
+ this.componentName = componentName;
+ LOG = log;
+ }
+
+ @Override
+ public void onSuccess(Void result) {
+ LOG.info("{} exited", componentName);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof CancellationException) {
+ LOG.info("{} was cancelled", componentName, t.getMessage());
+ } else {
+ LOG.warn("{} exited with error", componentName, t);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index b2cd55e..e4fe79c 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -15,7 +15,9 @@
package org.apache.hadoop.hive.llap.tezplugins;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
@@ -26,6 +28,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
+import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +40,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
import org.apache.hadoop.hive.llap.testhelpers.ControlledClock;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -44,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -62,7 +66,7 @@ public class TestLlapTaskSchedulerService {
private static final String HOST2 = "host2";
private static final String HOST3 = "host3";
- @Test (timeout = 5000)
+ @Test(timeout = 10000)
public void testSimpleLocalAllocation() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
@@ -77,18 +81,17 @@ public class TestLlapTaskSchedulerService {
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
- tsWrapper.signalSchedulerRun();
- tsWrapper.awaitSchedulerRun();
+ tsWrapper.awaitLocalTaskAllocations(1);
verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class));
- // TODO Verify this is on host1.
assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
} finally {
tsWrapper.shutdown();
}
}
- @Test (timeout = 5000)
+ @Test(timeout = 10000)
public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
@@ -99,8 +102,7 @@ public class TestLlapTaskSchedulerService {
Object clientCookie1 = new Object();
tsWrapper.controlScheduler(true);
tsWrapper.allocateTask(task1, null, priority1, clientCookie1);
- tsWrapper.signalSchedulerRun();
- tsWrapper.awaitSchedulerRun();
+ tsWrapper.awaitTotalTaskAllocations(1);
verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class));
assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
} finally {
@@ -109,7 +111,7 @@ public class TestLlapTaskSchedulerService {
}
- @Test(timeout=5000)
+ @Test(timeout = 10000)
public void testPreemption() throws InterruptedException, IOException {
Priority priority1 = Priority.newInstance(1);
@@ -174,7 +176,7 @@ public class TestLlapTaskSchedulerService {
}
- @Test(timeout=5000)
+ @Test(timeout = 10000)
public void testNodeDisabled() throws IOException, InterruptedException {
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l);
try {
@@ -233,7 +235,7 @@ public class TestLlapTaskSchedulerService {
}
}
- @Test(timeout=5000)
+ @Test(timeout = 10000)
public void testNodeReEnabled() throws InterruptedException, IOException {
// Based on actual timing.
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l);
@@ -307,7 +309,7 @@ public class TestLlapTaskSchedulerService {
}
}
- @Test (timeout = 5000)
+ @Test(timeout = 10000)
public void testForceLocalityTest1() throws IOException, InterruptedException {
// 2 hosts. 2 per host. 5 requests at the same priority.
// First 3 on host1, Next at host2, Last with no host.
@@ -316,7 +318,7 @@ public class TestLlapTaskSchedulerService {
}
- @Test (timeout = 5000)
+ @Test(timeout = 10000)
public void testNoForceLocalityCounterTest1() throws IOException, InterruptedException {
// 2 hosts. 2 per host. 5 requests at the same priority.
// First 3 on host1, Next at host2, Last with no host.
@@ -411,7 +413,7 @@ public class TestLlapTaskSchedulerService {
}
}
- @Test(timeout = 5000)
+ @Test(timeout = 10000)
public void testForcedLocalityUnknownHost() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
@@ -454,15 +456,13 @@ public class TestLlapTaskSchedulerService {
}
}
-
- @Test(timeout = 5000)
+ @Test(timeout = 10000)
public void testForcedLocalityPreemption() throws IOException, InterruptedException {
Priority priority1 = Priority.newInstance(1);
Priority priority2 = Priority.newInstance(2);
String [] hosts = new String[] {HOST1, HOST2};
String [] hostsH1 = new String[] {HOST1};
- String [] hostsH2 = new String[] {HOST2};
TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
@@ -485,13 +485,7 @@ public class TestLlapTaskSchedulerService {
tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
// This request at a lower priority should not affect anything.
tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3);
- while (true) {
- tsWrapper.signalSchedulerRun();
- tsWrapper.awaitSchedulerRun();
- if (tsWrapper.ts.dagStats.numLocalAllocations == 2) {
- break;
- }
- }
+ tsWrapper.awaitLocalTaskAllocations(2);
verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
@@ -517,13 +511,8 @@ public class TestLlapTaskSchedulerService {
tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
- while (true) {
- tsWrapper.signalSchedulerRun();
- tsWrapper.awaitSchedulerRun();
- if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
- break;
- }
- }
+ tsWrapper.awaitLocalTaskAllocations(3);
+
verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
eq(clientCookie4), any(Container.class));
@@ -532,11 +521,471 @@ public class TestLlapTaskSchedulerService {
}
}
+ @Test(timeout = 10000)
+ public void testForcedLocalityNotInDelayedQueue() throws IOException, InterruptedException {
+ String[] hosts = new String[]{HOST1, HOST2};
+
+ String[] hostsH1 = new String[]{HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
+ testNotInQueue(tsWrapper, hostsH1);
+ }
+
+ @Test(timeout = 10000)
+ public void testNoLocalityNotInDelayedQueue() throws IOException, InterruptedException {
+ String[] hosts = new String[]{HOST1};
+
+ String[] hostsH1 = new String[]{HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 0l);
+ testNotInQueue(tsWrapper, hostsH1);
+ }
+
+ private void testNotInQueue(TestTaskSchedulerServiceWrapper tsWrapper, String[] hosts) throws
+ InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+ try {
+ tsWrapper.controlScheduler(true);
+ tsWrapper.allocateTask(hosts, priority1);
+ tsWrapper.allocateTask(hosts, priority1);
+ tsWrapper.allocateTask(hosts, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ assertEquals(0, tsWrapper.ts.delayedTaskQueue.size());
+
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testDelayedLocalityFallbackToNonLocal() throws IOException, InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+ String [] hosts = new String[] {HOST1, HOST2};
+
+ String [] hostsH1 = new String[] {HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+ delayedTaskSchedulerCallableControlled =
+ (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+ ControlledClock clock = tsWrapper.getClock();
+ clock.setTime(clock.getTime());
+
+ // Fill up host1 with tasks. Leave host2 empty.
+ try {
+ tsWrapper.controlScheduler(true);
+ Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ verify(tsWrapper.mockAppCallback, times(2))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+ assertEquals(2, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+ reset(tsWrapper.mockAppCallback);
+
+ // No capacity left on node1. The next task should be allocated to node2 after it times out.
+ clock.setTime(clock.getTime() + 10000l); // Past the timeout.
+
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+ delayedTaskSchedulerCallableControlled.lastState);
+
+ delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+ delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+
+ // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK,
+ delayedTaskSchedulerCallableControlled.lastState);
+ assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered &&
+ delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult);
+
+ tsWrapper.awaitChangeInTotalAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(1))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ assertEquals(task3, argumentCaptor.getAllValues().get(0));
+ Container assignedContainer = containerCaptor.getValue();
+ assertEquals(HOST2, assignedContainer.getNodeId().getHost());
+
+
+ assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+ assertEquals(2, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+ assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST2).get());
+
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testDelayedLocalityDelayedAllocation() throws InterruptedException, IOException {
+ Priority priority1 = Priority.newInstance(1);
+ String [] hosts = new String[] {HOST1, HOST2};
+
+ String [] hostsH1 = new String[] {HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+ delayedTaskSchedulerCallableControlled =
+ (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+ ControlledClock clock = tsWrapper.getClock();
+ clock.setTime(clock.getTime());
+
+ // Fill up host1 with tasks. Leave host2 empty.
+ try {
+ tsWrapper.controlScheduler(true);
+ Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ verify(tsWrapper.mockAppCallback, times(2))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+ assertEquals(2, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+ reset(tsWrapper.mockAppCallback);
+
+ // Move the clock forward 2000ms, and check the delayed queue
+ clock.setTime(clock.getTime() + 2000l); // Past the timeout.
+
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+ delayedTaskSchedulerCallableControlled.lastState);
+
+ delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+ delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+
+ // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED,
+ delayedTaskSchedulerCallableControlled.lastState);
+ assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered);
+
+ tsWrapper.deallocateTask(task1, true, null);
+
+ // Node1 now has free capacity. task1 should be allocated to it.
+ tsWrapper.awaitChangeInTotalAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(1))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ assertEquals(task3, argumentCaptor.getAllValues().get(0));
+ Container assignedContainer = containerCaptor.getValue();
+ assertEquals(HOST1, assignedContainer.getNodeId().getHost());
+
+
+ assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+ assertEquals(3, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testDelayedQueeTaskSelectionAfterScheduled() throws IOException,
+ InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+ String [] hosts = new String[] {HOST1, HOST2};
+
+ String [] hostsH1 = new String[] {HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+ delayedTaskSchedulerCallableControlled =
+ (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+ ControlledClock clock = tsWrapper.getClock();
+ clock.setTime(clock.getTime());
+
+ // Fill up host1 with tasks. Leave host2 empty.
+ try {
+ tsWrapper.controlScheduler(true);
+ Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ verify(tsWrapper.mockAppCallback, times(2))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+ assertEquals(2, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+ // Simulate a 2s delay before finishing the task.
+ clock.setTime(clock.getTime() + 2000);
+
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+ delayedTaskSchedulerCallableControlled.lastState);
+
+ delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+ delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED,
+ delayedTaskSchedulerCallableControlled.lastState);
+ assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered);
+
+ reset(tsWrapper.mockAppCallback);
+
+ // Now finish task1, which will make capacity for task3 to run. Nothing is coming out of the delayed queue yet.
+ tsWrapper.deallocateTask(task1, true, null);
+ tsWrapper.awaitLocalTaskAllocations(3);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(1))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ assertEquals(task3, argumentCaptor.getAllValues().get(0));
+ Container assignedContainer = containerCaptor.getValue();
+ assertEquals(HOST1, assignedContainer.getNodeId().getHost());
+
+ reset(tsWrapper.mockAppCallback);
+
+ // Move the clock forward and trigger a run.
+ clock.setTime(clock.getTime() + 8000); // Set to start + 10000 which is the timeout
+ delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+ delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+ assertEquals(
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK,
+ delayedTaskSchedulerCallableControlled.lastState);
+ // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+ assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered &&
+ !delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult);
+
+ // Ensure there's no more invocations.
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ verify(tsWrapper.mockAppCallback, never()).taskAllocated(any(Object.class), any(Object.class), any(Container.class));
+
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testTaskInfoDelay() {
+
+ LlapTaskSchedulerService.LocalityDelayConf localityDelayConf1 =
+ new LlapTaskSchedulerService.LocalityDelayConf(3000);
+
+ ControlledClock clock = new ControlledClock(new MonotonicClock());
+ clock.setTime(clock.getTime());
+
+
+ // With a timeout of 3000.
+ LlapTaskSchedulerService.TaskInfo taskInfo =
+ new LlapTaskSchedulerService.TaskInfo(localityDelayConf1, clock, new Object(), new Object(),
+ mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+ assertFalse(taskInfo.shouldForceLocality());
+
+ assertEquals(3000, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+ assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+ clock.setTime(clock.getTime() + 500);
+ assertEquals(2500, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+ assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+ clock.setTime(clock.getTime() + 2500);
+ assertEquals(0, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+ assertFalse(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+
+ // No locality delay
+ LlapTaskSchedulerService.LocalityDelayConf localityDelayConf2 =
+ new LlapTaskSchedulerService.LocalityDelayConf(0);
+ taskInfo =
+ new LlapTaskSchedulerService.TaskInfo(localityDelayConf2, clock, new Object(), new Object(),
+ mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+ assertFalse(taskInfo.shouldDelayForLocality(clock.getTime()));
+ assertFalse(taskInfo.shouldForceLocality());
+ assertTrue(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0);
+
+ // Force locality
+ LlapTaskSchedulerService.LocalityDelayConf localityDelayConf3 =
+ new LlapTaskSchedulerService.LocalityDelayConf(-1);
+ taskInfo =
+ new LlapTaskSchedulerService.TaskInfo(localityDelayConf3, clock, new Object(), new Object(),
+ mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+ assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+ assertTrue(taskInfo.shouldForceLocality());
+ assertFalse(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0);
+ }
+
+ @Test(timeout = 10000)
+ public void testLocalityDelayTaskOrdering() throws InterruptedException, IOException {
+
+ LlapTaskSchedulerService.LocalityDelayConf localityDelayConf =
+ new LlapTaskSchedulerService.LocalityDelayConf(3000);
+
+ ControlledClock clock = new ControlledClock(new MonotonicClock());
+ clock.setTime(clock.getTime());
+
+ DelayQueue<LlapTaskSchedulerService.TaskInfo> delayedQueue = new DelayQueue<>();
+
+ LlapTaskSchedulerService.TaskInfo taskInfo1 =
+ new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(),
+ mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+ clock.setTime(clock.getTime() + 1000);
+ LlapTaskSchedulerService.TaskInfo taskInfo2 =
+ new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(),
+ mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+ delayedQueue.add(taskInfo1);
+ delayedQueue.add(taskInfo2);
+
+ assertEquals(taskInfo1, delayedQueue.peek());
+ }
+
+ @Test (timeout = 15000)
+ public void testDelayedLocalityNodeCommErrorImmediateAllocation() throws IOException, InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+ String [] hosts = new String[] {HOST1, HOST2};
+
+ String [] hostsH1 = new String[] {HOST1};
+
+ // Node disable timeout higher than locality delay.
+ TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(20000, hosts, 1, 1, 10000l);
+
+ // Fill up host1 with tasks. Leave host2 empty.
+ try {
+ long startTime = tsWrapper.getClock().getTime();
+ tsWrapper.controlScheduler(true);
+ Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ verify(tsWrapper.mockAppCallback, times(2))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+ assertEquals(2, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+ reset(tsWrapper.mockAppCallback);
+
+ // Mark a task as failed due to a comm failure.
+ tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR);
+
+ // Node1 marked as failed, node2 has capacity.
+ // Timeout for nodes is larger than delay - immediate allocation
+ tsWrapper.awaitChangeInTotalAllocations(2);
+
+ long thirdAllocateTime = tsWrapper.getClock().getTime();
+ long diff = thirdAllocateTime - startTime;
+ // diffAfterSleep < total sleepTime
+ assertTrue("Task not allocated in expected time window: duration=" + diff, diff < 10000l);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+ verify(tsWrapper.mockAppCallback, times(1))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+ assertEquals(1, argumentCaptor.getAllValues().size());
+ assertEquals(task3, argumentCaptor.getAllValues().get(0));
+ Container assignedContainer = containerCaptor.getValue();
+ assertEquals(HOST2, assignedContainer.getNodeId().getHost());
+
+
+ assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+ assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+ assertEquals(2, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+ assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST2).get());
+
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
+ @Test (timeout = 15000)
+ public void testDelayedLocalityNodeCommErrorDelayedAllocation() throws IOException, InterruptedException {
+ Priority priority1 = Priority.newInstance(1);
+ String [] hosts = new String[] {HOST1, HOST2};
+
+ String [] hostsH1 = new String[] {HOST1};
+
+ TestTaskSchedulerServiceWrapper tsWrapper =
+ new TestTaskSchedulerServiceWrapper(5000, hosts, 1, 1, 10000l, true);
+ LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+ delayedTaskSchedulerCallableControlled =
+ (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+ ControlledClock clock = tsWrapper.getClock();
+ clock.setTime(clock.getTime());
+
+ // Fill up host1 with tasks. Leave host2 empty.
+ try {
+ tsWrapper.controlScheduler(true);
+ Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+ Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+ tsWrapper.awaitLocalTaskAllocations(2);
+
+ verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+ ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+ verify(tsWrapper.mockAppCallback, times(2))
+ .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+ assertEquals(2, argumentCaptor.getAllValues().size());
+ assertEquals(task1, argumentCaptor.getAllValues().get(0));
+ assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+ reset(tsWrapper.mockAppCallback);
+
+ // Mark a task as failed due to a comm failure.
+ tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR);
+
+ // Node1 has free capacity but is disabled. Node 2 has capcaity. Delay > re-enable tiemout
+ tsWrapper.ensureNoChangeInTotalAllocations(2, 2000l);
+ } finally {
+ tsWrapper.shutdown();
+ }
+ }
+
private static class TestTaskSchedulerServiceWrapper {
static final Resource resource = Resource.newInstance(1024, 1);
Configuration conf;
TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
- ControlledClock clock = new ControlledClock(new SystemClock());
+ ControlledClock clock = new ControlledClock(new MonotonicClock());
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
LlapTaskSchedulerServiceForTest ts;
@@ -555,14 +1004,21 @@ public class TestLlapTaskSchedulerService {
this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l);
}
- TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize, long localityDelayMs) throws
+ TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
+ int waitQueueSize, long localityDelayMs) throws
+ IOException, InterruptedException {
+ this(nodeDisableTimeoutMillis, hosts, numExecutors, waitQueueSize, localityDelayMs, false);
+ }
+
+ TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
+ int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue) throws
IOException, InterruptedException {
conf = new Configuration();
conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize);
conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
- disableTimeoutMillis + "ms");
+ nodeDisableTimeoutMillis + "ms");
conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false);
conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs);
@@ -571,7 +1027,11 @@ public class TestLlapTaskSchedulerService {
UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
- ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
+ if (controlledDelayedTaskQueue) {
+ ts = new LlapTaskSchedulerServiceForTestControlled(mockAppCallback, clock);
+ } else {
+ ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
+ }
controlScheduler(true);
ts.initialize();
@@ -582,6 +1042,10 @@ public class TestLlapTaskSchedulerService {
awaitSchedulerRun();
}
+ ControlledClock getClock() {
+ return clock;
+ }
+
void controlScheduler(boolean val) {
ts.forTestsetControlScheduling(val);
}
@@ -591,8 +1055,19 @@ public class TestLlapTaskSchedulerService {
}
void awaitSchedulerRun() throws InterruptedException {
- ts.forTestAwaitSchedulingRun();
+ ts.forTestAwaitSchedulingRun(-1);
+ }
+
+ /**
+ *
+ * @param timeoutMs
+ * @return false if the time elapsed
+ * @throws InterruptedException
+ */
+ boolean awaitSchedulerRun(long timeoutMs) throws InterruptedException {
+ return ts.forTestAwaitSchedulingRun(timeoutMs);
}
+
void resetAppCallback() {
reset(mockAppCallback);
}
@@ -605,6 +1080,8 @@ public class TestLlapTaskSchedulerService {
ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie);
}
+
+
void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) {
ts.deallocateTask(task, succeeded, endReason, null);
}
@@ -612,6 +1089,60 @@ public class TestLlapTaskSchedulerService {
void rejectExecution(Object task) {
ts.deallocateTask(task, false, TaskAttemptEndReason.EXECUTOR_BUSY, null);
}
+
+
+ // More complex methods which may wrap multiple operations
+ Object allocateTask(String[] hosts, Priority priority) {
+ Object task = new Object();
+ Object clientCookie = new Object();
+ allocateTask(task, hosts, priority, clientCookie);
+ return task;
+ }
+
+ public void awaitTotalTaskAllocations(int numTasks) throws InterruptedException {
+ while (true) {
+ signalSchedulerRun();
+ awaitSchedulerRun();
+ if (ts.dagStats.numTotalAllocations == numTasks) {
+ break;
+ }
+ }
+ }
+
+ public void awaitLocalTaskAllocations(int numTasks) throws InterruptedException {
+ while (true) {
+ signalSchedulerRun();
+ awaitSchedulerRun();
+ if (ts.dagStats.numLocalAllocations == numTasks) {
+ break;
+ }
+ }
+ }
+
+ public void awaitChangeInTotalAllocations(int previousAllocations) throws InterruptedException {
+ while (true) {
+ signalSchedulerRun();
+ awaitSchedulerRun();
+ if (ts.dagStats.numTotalAllocations > previousAllocations) {
+ break;
+ }
+ Thread.sleep(200l);
+ }
+ }
+
+ public void ensureNoChangeInTotalAllocations(int previousAllocations, long timeout) throws
+ InterruptedException {
+ long startTime = Time.monotonicNow();
+ long timeLeft = timeout;
+ while (timeLeft > 0) {
+ signalSchedulerRun();
+ awaitSchedulerRun(Math.min(200, timeLeft));
+ if (ts.dagStats.numTotalAllocations != previousAllocations) {
+ throw new IllegalStateException("NumTotalAllocations expected to stay at " + previousAllocations + ". Actual=" + ts.dagStats.numTotalAllocations);
+ }
+ timeLeft = (startTime + timeout) - Time.monotonicNow();
+ }
+ }
}
private static class LlapTaskSchedulerServiceForTest extends LlapTaskSchedulerService {
@@ -632,6 +1163,7 @@ public class TestLlapTaskSchedulerService {
@Override
protected void schedulePendingTasks() {
+ LOG.info("Attempted schedulPendingTasks");
testLock.lock();
try {
if (controlScheduling.get()) {
@@ -668,17 +1200,143 @@ public class TestLlapTaskSchedulerService {
}
}
- void forTestAwaitSchedulingRun() throws InterruptedException {
+ boolean forTestAwaitSchedulingRun(long timeout) throws InterruptedException {
testLock.lock();
try {
+ boolean success = true;
while (!schedulingComplete) {
- schedulingCompleteCondition.await();
+ if (timeout == -1) {
+ schedulingCompleteCondition.await();
+ } else {
+ success = schedulingCompleteCondition.await(timeout, TimeUnit.MILLISECONDS);
+ break;
+ }
}
schedulingComplete = false;
+ return success;
} finally {
testLock.unlock();
}
}
}
+
+ private static class LlapTaskSchedulerServiceForTestControlled extends LlapTaskSchedulerServiceForTest {
+
+ private DelayedTaskSchedulerCallableControlled controlledTSCallable;
+
+ public LlapTaskSchedulerServiceForTestControlled(
+ TaskSchedulerContext appClient, Clock clock) {
+ super(appClient, clock);
+ }
+
+ @Override
+ LlapTaskSchedulerService.DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() {
+ controlledTSCallable = new DelayedTaskSchedulerCallableControlled();
+ return controlledTSCallable;
+ }
+
+ class DelayedTaskSchedulerCallableControlled extends DelayedTaskSchedulerCallable {
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition triggerRunCondition = lock.newCondition();
+ private boolean shouldRun = false;
+ private final Condition runCompleteCondition = lock.newCondition();
+ private boolean runComplete = false;
+
+ static final int STATE_NOT_RUN = 0;
+ static final int STATE_NULL_FOUND = 1;
+ static final int STATE_TIMEOUT_NOT_EXPIRED = 2;
+ static final int STATE_RETURNED_TASK = 3;
+
+ volatile int lastState = STATE_NOT_RUN;
+
+ volatile boolean lastShouldScheduleTaskResult = false;
+ volatile boolean shouldScheduleTaskTriggered = false;
+
+ @Override
+ public void processEvictedTask(TaskInfo taskInfo) {
+ super.processEvictedTask(taskInfo);
+ signalRunComplete();
+ }
+
+ @Override
+ public TaskInfo getNextTask() throws InterruptedException {
+
+ while (true) {
+ lock.lock();
+ try {
+ while (!shouldRun) {
+ triggerRunCondition.await();
+ }
+ // Preven subsequent runs until a new trigger is set.
+ shouldRun = false;
+ } finally {
+ lock.unlock();
+ }
+ TaskInfo taskInfo = delayedTaskQueue.peek();
+ if (taskInfo == null) {
+ LOG.info("Triggered getTask but the queue is empty");
+ lastState = STATE_NULL_FOUND;
+ signalRunComplete();
+ continue;
+ }
+ if (taskInfo.shouldDelayForLocality(
+ LlapTaskSchedulerServiceForTestControlled.this.clock.getTime())) {
+ LOG.info("Triggered getTask but the first element is not ready to execute");
+ lastState = STATE_TIMEOUT_NOT_EXPIRED;
+ signalRunComplete();
+ continue;
+ } else {
+ delayedTaskQueue.poll(); // Remove the previously peeked element.
+ lastState = STATE_RETURNED_TASK;
+ return taskInfo;
+ }
+ }
+ }
+
+ @Override
+ public boolean shouldScheduleTask(TaskInfo taskInfo) {
+ shouldScheduleTaskTriggered = true;
+ lastShouldScheduleTaskResult = super.shouldScheduleTask(taskInfo);
+ return lastShouldScheduleTaskResult;
+ }
+
+ void resetShouldScheduleInformation() {
+ shouldScheduleTaskTriggered = false;
+ lastShouldScheduleTaskResult = false;
+ }
+
+ private void signalRunComplete() {
+ lock.lock();
+ try {
+ runComplete = true;
+ runCompleteCondition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void triggerGetNextTask() {
+ lock.lock();
+ try {
+ shouldRun = true;
+ triggerRunCondition.signal();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ void awaitGetNextTaskProcessing() throws InterruptedException {
+ lock.lock();
+ try {
+ while (!runComplete) {
+ runCompleteCondition.await();
+ }
+ runComplete = false;
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
}
[06/11] hive git commit: HIVE-13447 : LLAP: check ZK acls for
registry and fail if they are too permissive (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Posted by jd...@apache.org.
HIVE-13447 : LLAP: check ZK acls for registry and fail if they are too permissive (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1289aff1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1289aff1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1289aff1
Branch: refs/heads/llap
Commit: 1289aff1adbff9f35032d4a6ed074207fe1eb4de
Parents: 134b6cc
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 29 10:55:48 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 29 10:55:48 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../impl/LlapZookeeperRegistryImpl.java | 74 +++++++++++++++-----
.../hadoop/hive/ql/exec/tez/DagUtils.java | 1 -
3 files changed, 60 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1289aff1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index fd725cb..db4b9e8 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2803,6 +2803,9 @@ public class HiveConf extends Configuration {
false,
"Whether to setup split locations to match nodes on which llap daemons are running," +
" instead of using the locations provided by the split itself"),
+ LLAP_VALIDATE_ACLS("hive.llap.validate.acls", true,
+ "Whether LLAP should reject permissive ACLs in some cases (e.g. its own management\n" +
+ "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."),
SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
"60s", new TimeValidator(TimeUnit.SECONDS),
http://git-wip-us.apache.org/repos/asf/hive/blob/1289aff1/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index d51249a..6981061 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -70,10 +70,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.client.ZooKeeperSaslClient;
import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LlapZookeeperRegistryImpl implements ServiceRegistry {
@@ -88,10 +90,13 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
private static final String IPC_SHUFFLE = "shuffle";
private static final String IPC_LLAP = "llap";
private final static String ROOT_NAMESPACE = "llap";
+ private final static String USER_SCOPE_PATH_PREFIX = "user-";
private final Configuration conf;
private final CuratorFramework zooKeeperClient;
- private final String pathPrefix;
+ private final String pathPrefix, userPathPrefix;
+ private String userNameFromPrincipal; // Only set when setting up the secure config for ZK.
+
private PersistentEphemeralNode znode;
private String znodePath; // unique identity for this instance
private final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data
@@ -125,23 +130,23 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
@Override
public List<ACL> getDefaultAcl() {
- List<ACL> nodeAcls = new ArrayList<ACL>();
- if (UserGroupInformation.isSecurityEnabled()) {
- // Read all to the world
- nodeAcls.addAll(ZooDefs.Ids.READ_ACL_UNSAFE);
- // Create/Delete/Write/Admin to the authenticated user
- nodeAcls.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
- } else {
- // ACLs for znodes on a non-kerberized cluster
- // Create/Read/Delete/Write/Admin to the world
- nodeAcls.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE);
- }
- return nodeAcls;
+ // We always return something from getAclForPath so this should not happen.
+ LOG.warn("getDefaultAcl was called");
+ return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
@Override
public List<ACL> getAclForPath(String path) {
- return getDefaultAcl();
+ if (!UserGroupInformation.isSecurityEnabled() || path == null
+ || !path.contains(userPathPrefix)) {
+ // No security or the path is below the user path - full access.
+ return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+ // Read all to the world
+ List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
+ // Create/Delete/Write/Admin to creator
+ nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
+ return nodeAcls;
}
};
@@ -172,7 +177,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
// worker does not respond due to communication interruptions it will retain the same sequence
// number when it returns back. If session timeout expires, the node will be deleted and new
// addition of the same node (restart) will get next sequence number
- this.pathPrefix = "/" + RegistryUtils.currentUser() + "/" + instanceName + "/workers/worker-";
+ this.userPathPrefix = USER_SCOPE_PATH_PREFIX + RegistryUtils.currentUser();
+ this.pathPrefix = "/" + userPathPrefix + "/" + instanceName + "/workers/worker-";
this.instancesCache = null;
this.instances = null;
this.stateChangeListeners = new HashSet<>();
@@ -276,9 +282,11 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
}
znodePath = znode.getActualPath();
+ if (HiveConf.getBoolVar(conf, ConfVars.LLAP_VALIDATE_ACLS)) {
+ checkAcls();
+ }
// Set a watch on the znode
- if (zooKeeperClient.checkExists()
- .forPath(znodePath) == null) {
+ if (zooKeeperClient.checkExists().forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper.");
}
@@ -297,6 +305,31 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
return uniq.toString();
}
+ private void checkAcls() throws Exception {
+ if (!UserGroupInformation.isSecurityEnabled()) return;
+ String pathToCheck = znodePath;
+ // We are trying to check ACLs on the "workers" directory, which noone except us should be
+ // able to write to. Higher-level directories shouldn't matter - we don't read them.
+ int ix = pathToCheck.lastIndexOf('/');
+ if (ix > 0) {
+ pathToCheck = pathToCheck.substring(0, ix);
+ }
+ List<ACL> acls = zooKeeperClient.usingNamespace(null).getACL().forPath(pathToCheck);
+ if (acls == null || acls.isEmpty()) {
+ // Can there be no ACLs? There's some access (to get ACLs), so assume it means free for all.
+ throw new SecurityException("No ACLs on " + pathToCheck);
+ }
+ // This could be brittle.
+ assert userNameFromPrincipal != null;
+ Id currentUser = new Id("sasl", userNameFromPrincipal);
+ for (ACL acl : acls) {
+ if ((acl.getPerms() & ~ZooDefs.Perms.READ) == 0 || currentUser.equals(acl.getId())) {
+ continue; // Read permission/no permissions, or the expected user.
+ }
+ throw new SecurityException("The ACL " + acl + " is unnacceptable for " + pathToCheck);
+ }
+ }
+
@Override
public void unregister() throws IOException {
// Nothing for the zkCreate models
@@ -643,6 +676,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME);
principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0");
+ userNameFromPrincipal = getUserNameFromPrincipal(principal);
JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal,
keyTabFile);
@@ -650,6 +684,12 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
javax.security.auth.login.Configuration.setConfiguration(jaasConf);
}
+ private String getUserNameFromPrincipal(String principal) {
+ // Based on SecurityUtil.
+ String[] components = principal.split("[/@]");
+ return (components == null || components.length != 3) ? principal : components[0];
+ }
+
/**
* A JAAS configuration for ZooKeeper clients intended to use for SASL
* Kerberos.
http://git-wip-us.apache.org/repos/asf/hive/blob/1289aff1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 8aca779..79da860 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -577,7 +577,6 @@ public class DagUtils {
}
}
- // TODO# HERE?
if (mapWork instanceof MergeFileWork) {
Path outputPath = ((MergeFileWork) mapWork).getOutputDir();
// prepare the tmp output directory. The output tmp directory should
[09/11] hive git commit: HIVE-13654: Add JAVA8_URL to
jenkins-submit-build.sh
Posted by jd...@apache.org.
HIVE-13654: Add JAVA8_URL to jenkins-submit-build.sh
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9179178e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9179178e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9179178e
Branch: refs/heads/llap
Commit: 9179178e46420cb829f489a8ba59733554d8de4c
Parents: 324a2c6
Author: Sergio Pena <se...@cloudera.com>
Authored: Fri Apr 29 16:07:55 2016 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Fri Apr 29 16:07:55 2016 -0500
----------------------------------------------------------------------
dev-support/jenkins-common.sh | 14 ++++++++++++++
dev-support/jenkins-submit-build.sh | 4 ++++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9179178e/dev-support/jenkins-common.sh
----------------------------------------------------------------------
diff --git a/dev-support/jenkins-common.sh b/dev-support/jenkins-common.sh
index 473fd16..dc98db5 100644
--- a/dev-support/jenkins-common.sh
+++ b/dev-support/jenkins-common.sh
@@ -101,3 +101,17 @@ process_jira() {
patch_contains_hms_upgrade() {
curl -s "$1" | grep "^diff.*metastore/scripts/upgrade/" >/dev/null
}
+
+string_to_upper_case() {
+ local str="$1"
+
+ echo "$str" | tr '[:lower:]' '[:upper:]'
+}
+
+get_jenkins_job_url() {
+ local branch="$1"
+ local varname=`string_to_upper_case $branch`_URL
+ local joburl=`eval echo \\$${varname}`
+
+ echo $joburl
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/9179178e/dev-support/jenkins-submit-build.sh
----------------------------------------------------------------------
diff --git a/dev-support/jenkins-submit-build.sh b/dev-support/jenkins-submit-build.sh
index a145e00..d055c38 100644
--- a/dev-support/jenkins-submit-build.sh
+++ b/dev-support/jenkins-submit-build.sh
@@ -48,6 +48,10 @@ case "$BUILD_PROFILE" in
test -n "$BEELINE_CLI_URL" || fail "BEELINE_CLI_URL must be specified"
url="$BEELINE_CLI_URL&ISSUE_NUM=$ISSUE_NUM"
;;
+ java8-mr2)
+ test -n "$JAVA8_URL" || fail "JAVA8_URL must be specified"
+ url="$JAVA8_URL&ISSUE_NUM=$ISSUE_NUM"
+ ;;
*)
echo "Unknown profile '$BUILD_PROFILE'"
exit 1