You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/01 03:30:55 UTC

[01/11] hive git commit: HIVE-13588: NPE is thrown from MapredLocalTask.executeInChildVM (Chaoyu Tang, reviewed by Yongzhi Chen)

Repository: hive
Updated Branches:
  refs/heads/llap 390cb8cd0 -> 9f999f252


HIVE-13588: NPE is thrown from MapredLocalTask.executeInChildVM (Chaoyu Tang, reviewed by Yongzhi Chen)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ba864a24
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ba864a24
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ba864a24

Branch: refs/heads/llap
Commit: ba864a241b08c7916be1eefe08f7972451aeda15
Parents: 0ebcd93
Author: ctang <ct...@cloudera.com>
Authored: Thu Apr 28 22:06:47 2016 -0400
Committer: ctang <ct...@cloudera.com>
Committed: Thu Apr 28 22:06:47 2016 -0400

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/mr/MapredLocalTask.java     | 16 ++++++++++++----
 .../clientpositive/auto_sortmerge_join_8.q.out      |  2 --
 .../results/clientpositive/llap/tez_join_hash.q.out |  4 ----
 .../results/clientpositive/tez/tez_join_hash.q.out  |  4 ----
 4 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/ba864a24/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 3c1f0de..24bf506 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -319,10 +319,18 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab
 
       CachingPrintStream errPrintStream = new CachingPrintStream(System.err);
 
-      StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out,
-        OperationLog.getCurrentOperationLog().getPrintStream());
-      StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream,
-        OperationLog.getCurrentOperationLog().getPrintStream());
+      StreamPrinter outPrinter;
+      StreamPrinter errPrinter;
+      OperationLog operationLog = OperationLog.getCurrentOperationLog();
+      if (operationLog != null) {
+        outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out,
+            operationLog.getPrintStream());
+        errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream,
+            operationLog.getPrintStream());
+      } else {
+        outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
+        errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream);
+      }
 
       outPrinter.start();
       errPrinter.start();

http://git-wip-us.apache.org/repos/asf/hive/blob/ba864a24/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out b/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
index ce0590c..23a3685 100644
--- a/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
+++ b/ql/src/test/results/clientpositive/auto_sortmerge_join_8.q.out
@@ -1411,8 +1411,6 @@ PREHOOK: Input: default@bucket_small
 PREHOOK: Input: default@bucket_small@ds=2008-04-08
 PREHOOK: Input: default@bucket_small@ds=2008-04-09
 #### A masked pattern was here ####
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
-ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 POSTHOOK: query: select count(*) FROM bucket_big a JOIN bucket_small b ON a.key = b.key
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@bucket_big

http://git-wip-us.apache.org/repos/asf/hive/blob/ba864a24/ql/src/test/results/clientpositive/llap/tez_join_hash.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/tez_join_hash.q.out b/ql/src/test/results/clientpositive/llap/tez_join_hash.q.out
index 1fd45aa..54ca9d2 100644
--- a/ql/src/test/results/clientpositive/llap/tez_join_hash.q.out
+++ b/ql/src/test/results/clientpositive/llap/tez_join_hash.q.out
@@ -652,10 +652,6 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
 #### A masked pattern was here ####
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
-ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
-ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 POSTHOOK: query: select key, count(*) from (select x.key as key, y.value as value from
 srcpart x join srcpart y on (x.key = y.key)
 union all

http://git-wip-us.apache.org/repos/asf/hive/blob/ba864a24/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out b/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
index 2f51094..8d0aba1 100644
--- a/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
+++ b/ql/src/test/results/clientpositive/tez/tez_join_hash.q.out
@@ -638,10 +638,6 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12
 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11
 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12
 #### A masked pattern was here ####
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
-ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
-FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask
-ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.mr.MapRedTask
 POSTHOOK: query: select key, count(*) from (select x.key as key, y.value as value from
 srcpart x join srcpart y on (x.key = y.key)
 union all


[08/11] hive git commit: HIVE-12963 : LIMIT statement with SORT BY creates additional MR job with hardcoded only one reducer (Alina Abramova, reviewed by Sergey Shelukhin)

Posted by jd...@apache.org.
HIVE-12963 : LIMIT statement with SORT BY creates additional MR job with hardcoded only one reducer (Alina Abramova, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/324a2c6e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/324a2c6e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/324a2c6e

Branch: refs/heads/llap
Commit: 324a2c6ec86aba7c1baf74caf52b615b400825c9
Parents: 6460529
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 29 12:13:18 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 29 12:13:18 2016 -0700

----------------------------------------------------------------------
 common/src/java/org/apache/hadoop/hive/conf/HiveConf.java         | 2 ++
 itests/src/test/resources/testconfiguration.properties            | 1 +
 ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +-
 ql/src/test/results/clientpositive/groupby1_limit.q.out           | 2 +-
 4 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/324a2c6e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 46a3b96..b13de92 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1009,6 +1009,8 @@ public class HiveConf extends Configuration {
         "This parameter decides if Hive should add an additional map-reduce job. If the grouping set\n" +
         "cardinality (4 in the example above), is more than this value, a new MR job is added under the\n" +
         "assumption that the original group by will reduce the data size."),
+    HIVE_GROUPBY_LIMIT_EXTRASTEP("hive.groupby.limit.extrastep", true, "This parameter decides if Hive should \n" +
+        "create new MR job for sorting final output"),
 
     // Max filesize used to do a single copy (after that, distcp is used)
     HIVE_EXEC_COPYFILE_MAXSIZE("hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/,

http://git-wip-us.apache.org/repos/asf/hive/blob/324a2c6e/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index e46e6ce..0ef3161 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -874,6 +874,7 @@ spark.query.files=add_part_multiple.q, \
   groupby_sort_1_23.q, \
   groupby_sort_skew_1.q, \
   groupby_sort_skew_1_23.q, \
+  qroupby_limit_extrastep.q, \
   having.q, \
   identity_project_remove_skip.q, \
   index_auto_self_join.q, \

http://git-wip-us.apache.org/repos/asf/hive/blob/324a2c6e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index cfe4497..06db7f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7300,7 +7300,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     Operator curr = genLimitPlan(dest, qb, input, offset, limit);
 
     // the client requested that an extra map-reduce step be performed
-    if (!extraMRStep) {
+    if (!extraMRStep  || !HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_GROUPBY_LIMIT_EXTRASTEP)){
       return curr;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/324a2c6e/ql/src/test/results/clientpositive/groupby1_limit.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/groupby1_limit.q.out b/ql/src/test/results/clientpositive/groupby1_limit.q.out
index aacd23c..8d7fbfa 100644
--- a/ql/src/test/results/clientpositive/groupby1_limit.q.out
+++ b/ql/src/test/results/clientpositive/groupby1_limit.q.out
@@ -68,7 +68,7 @@ STAGE PLANS:
       Map Operator Tree:
           TableScan
             Reduce Output Operator
-              sort order: 
+              sort order:
               Statistics: Num rows: 5 Data size: 50 Basic stats: COMPLETE Column stats: NONE
               TopN Hash Memory Usage: 0.1
               value expressions: _col0 (type: string), _col1 (type: double)


[11/11] hive git commit: Merge branch 'master' into llap

Posted by jd...@apache.org.
Merge branch 'master' into llap

Conflicts:
	common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
	llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9f999f25
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9f999f25
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9f999f25

Branch: refs/heads/llap
Commit: 9f999f252746a11c766624ca947e31f3fe59ec07
Parents: 390cb8c 9e1fa0c
Author: Jason Dere <jd...@hortonworks.com>
Authored: Sat Apr 30 18:30:25 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Sat Apr 30 18:30:25 2016 -0700

----------------------------------------------------------------------
 HIVE-13509.2.patch                              | 478 ++++++++++++
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  10 +-
 dev-support/jenkins-common.sh                   |  14 +
 dev-support/jenkins-submit-build.sh             |   4 +
 .../hive/hcatalog/common/HCatConstants.java     |   3 +
 .../hcatalog/mapreduce/HCatBaseInputFormat.java |  29 +-
 .../hive/hcatalog/pig/TestHCatLoader.java       |  55 ++
 .../service/cli/session/TestQueryDisplay.java   |   4 +-
 .../test/resources/testconfiguration.properties |   1 +
 .../impl/LlapZookeeperRegistryImpl.java         |  74 +-
 .../hive/llap/tezplugins/ContainerFactory.java  |   3 +-
 .../tezplugins/LlapTaskSchedulerService.java    | 377 ++++++++--
 .../llap/tezplugins/helpers/MonotonicClock.java |  24 +
 .../scheduler/LoggingFutureCallback.java        |  44 ++
 .../TestLlapTaskSchedulerService.java           | 734 ++++++++++++++++++-
 .../hadoop/hive/metastore/HiveMetaStore.java    |   2 +-
 .../metastore/TestHiveMetaStoreGetMetaConf.java | 151 ++++
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  13 +-
 .../org/apache/hadoop/hive/ql/QueryDisplay.java |  19 +-
 .../org/apache/hadoop/hive/ql/QueryPlan.java    |  16 +-
 .../hadoop/hive/ql/exec/ConditionalTask.java    |   1 +
 .../apache/hadoop/hive/ql/exec/Registry.java    | 323 +++++---
 .../org/apache/hadoop/hive/ql/exec/Task.java    |  14 +-
 .../hadoop/hive/ql/exec/mr/ExecDriver.java      |   2 +-
 .../hive/ql/exec/mr/HadoopJobExecHelper.java    |   6 +-
 .../hadoop/hive/ql/exec/mr/MapredLocalTask.java |  16 +-
 .../hadoop/hive/ql/exec/tez/DagUtils.java       |   1 -
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  39 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   2 +-
 .../hadoop/hive/ql/session/SessionState.java    |   3 +-
 .../clientpositive/auto_sortmerge_join_8.q.out  |   2 -
 .../results/clientpositive/groupby1_limit.q.out |   2 +-
 .../clientpositive/llap/tez_join_hash.q.out     |   4 -
 .../clientpositive/tez/tez_join_hash.q.out      |   4 -
 .../apache/hive/service/cli/CLIServiceTest.java |   4 +-
 .../apache/hadoop/hive/shims/ShimLoader.java    |  10 +-
 36 files changed, 2172 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9f999f25/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --cc common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d7afa4d,b13de92..7db492f
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@@ -2806,8 -2808,9 +2811,11 @@@ public class HiveConf extends Configura
          false,
          "Whether to setup split locations to match nodes on which llap daemons are running," +
              " instead of using the locations provided by the split itself"),
+     LLAP_VALIDATE_ACLS("hive.llap.validate.acls", true,
+         "Whether LLAP should reject permissive ACLs in some cases (e.g. its own management\n" +
+         "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."),
 +    LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003,
 +        "LLAP daemon output service port"),
  
      SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
        "60s", new TimeValidator(TimeUnit.SECONDS),

http://git-wip-us.apache.org/repos/asf/hive/blob/9f999f25/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --cc llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 0dc7599,6981061..fde70e7
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@@ -87,8 -89,8 +89,9 @@@ public class LlapZookeeperRegistryImpl 
    private static final String IPC_MNG = "llapmng";
    private static final String IPC_SHUFFLE = "shuffle";
    private static final String IPC_LLAP = "llap";
 +  private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
    private final static String ROOT_NAMESPACE = "llap";
+   private final static String USER_SCOPE_PATH_PREFIX = "user-";
  
    private final Configuration conf;
    private final CuratorFramework zooKeeperClient;
@@@ -173,7 -177,8 +178,8 @@@
      // worker does not respond due to communication interruptions it will retain the same sequence
      // number when it returns back. If session timeout expires, the node will be deleted and new
      // addition of the same node (restart) will get next sequence number
-     this.pathPrefix = "/" + getZkPathUser(this.conf) + "/" + instanceName + "/workers/worker-";
 -    this.userPathPrefix = USER_SCOPE_PATH_PREFIX + RegistryUtils.currentUser();
++    this.userPathPrefix = USER_SCOPE_PATH_PREFIX + getZkPathUser(this.conf);
+     this.pathPrefix = "/" + userPathPrefix + "/" + instanceName + "/workers/worker-";
      this.instancesCache = null;
      this.instances = null;
      this.stateChangeListeners = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/hive/blob/9f999f25/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------


[05/11] hive git commit: HIVE-13510 : Dynamic partitioning doesn’t work when remote metastore is used (Illya Yalovyy via Ashutosh Chauhan)

Posted by jd...@apache.org.
HIVE-13510 :  Dynamic partitioning doesn\u2019t work when remote metastore is used (Illya Yalovyy via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/134b6ccc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/134b6ccc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/134b6ccc

Branch: refs/heads/llap
Commit: 134b6cccbd7237901f7f7594626796863ca0150a
Parents: 347a5a5
Author: Illya Yalovyy <ya...@amazon.com>
Authored: Tue Apr 26 12:18:00 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Fri Apr 29 03:36:36 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/metastore/HiveMetaStore.java    |   2 +-
 .../metastore/TestHiveMetaStoreGetMetaConf.java | 151 +++++++++++++++++++
 2 files changed, 152 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/134b6ccc/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index ed2057a..4ada9c1 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -527,7 +527,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       if (confVar == null) {
         throw new MetaException("Invalid configuration key " + key);
       }
-      return getConf().get(key);
+      return getConf().get(key, confVar.getDefaultValue());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hive/blob/134b6ccc/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreGetMetaConf.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreGetMetaConf.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreGetMetaConf.java
new file mode 100644
index 0000000..3f4561c
--- /dev/null
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreGetMetaConf.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.security.Permission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.thrift.TException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+import org.junit.Before;
+
+public class TestHiveMetaStoreGetMetaConf {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestHiveMetaStoreGetMetaConf.class);
+  private static final String msPort = "20103";
+  private static HiveConf hiveConf;
+  private static SecurityManager securityManager;
+
+  private HiveMetaStoreClient hmsc;
+
+  public static class NoExitSecurityManager extends SecurityManager {
+
+    @Override
+    public void checkPermission(Permission perm) {
+      // allow anything.
+    }
+
+    @Override
+    public void checkPermission(Permission perm, Object context) {
+      // allow anything.
+    }
+
+    @Override
+    public void checkExit(int status) {
+      super.checkExit(status);
+      throw new RuntimeException("System.exit() was called. Raising exception.");
+    }
+  }
+
+  private static class RunMS implements Runnable {
+
+    @Override
+    public void run() {
+      try {
+        HiveMetaStore.main(new String[]{"-v", "-p", msPort, "--hiveconf",
+            "hive.metastore.expression.proxy=" + MockPartitionExpressionForMetastore.class.getCanonicalName(),
+            "--hiveconf", "hive.metastore.try.direct.sql.ddl=false"});
+      } catch (Throwable t) {
+        LOG.error("Exiting. Got exception from metastore: ", t);
+      }
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    LOG.info("Shutting down metastore.");
+    System.setSecurityManager(securityManager);
+  }
+
+  @BeforeClass
+  public static void startMetaStoreServer() throws Exception {
+
+    securityManager = System.getSecurityManager();
+    System.setSecurityManager(new NoExitSecurityManager());
+    
+    hiveConf = new HiveConf(TestHiveMetaStoreGetMetaConf.class);
+    hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
+        + msPort);
+    hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+    hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 10);
+
+    System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+    System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+
+    new Thread(new RunMS()).start();
+  }
+
+  @Before
+  public void setup() throws MetaException {
+    hmsc = new HiveMetaStoreClient(hiveConf);
+  }
+
+  @After
+  public void closeClient() {
+    if (hmsc != null) {
+      hmsc.close();
+    }
+  }
+
+  @Test
+  public void testGetMetaConfDefault() throws MetaException, TException {
+    HiveConf.ConfVars metaConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL;
+    String expected = metaConfVar.getDefaultValue();
+    String actual = hmsc.getMetaConf(metaConfVar.toString());
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testGetMetaConfDefaultEmptyString() throws MetaException, TException {
+    HiveConf.ConfVars metaConfVar = HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN;
+    String expected = "";
+    String actual = hmsc.getMetaConf(metaConfVar.toString());
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testGetMetaConfOverridden() throws MetaException, TException {
+    HiveConf.ConfVars metaConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL_DDL;
+    String expected = "false";
+    String actual = hmsc.getMetaConf(metaConfVar.toString());
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testGetMetaConfUnknownPreperty() throws MetaException, TException {
+    String unknownPropertyName = "hive.meta.foo.bar";
+    thrown.expect(MetaException.class);
+    thrown.expectMessage("Invalid configuration key " + unknownPropertyName);
+    hmsc.getMetaConf(unknownPropertyName);
+  }
+}


[03/11] hive git commit: HIVE-13572 : Redundant setting full file status in Hive::copyFiles (Rui Li via Ashutosh Chauhan)

Posted by jd...@apache.org.
HIVE-13572 : Redundant setting full file status in Hive::copyFiles (Rui Li via Ashutosh Chauhan)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/076f3655
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/076f3655
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/076f3655

Branch: refs/heads/llap
Commit: 076f3655b846f3214bc304e405d321222a3819ab
Parents: 4377c7f
Author: Rui Li <ru...@intel.com>
Authored: Tue Apr 26 18:14:00 2016 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu Apr 28 19:22:13 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 39 +++++++++++---------
 .../apache/hadoop/hive/shims/ShimLoader.java    | 10 +++--
 2 files changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/076f3655/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index ab165f1..4d9c3d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2641,26 +2641,29 @@ private void constructOneLBLocationMap(FileStatus fSta,
         files = new FileStatus[] {src};
       }
 
-      for (FileStatus srcFile : files) {
-
-        final Path srcP = srcFile.getPath();
-        final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs);
-        // Strip off the file type, if any so we don't make:
-        // 000000_0.gz -> 000000_0.gz_copy_1
-        final String name;
-        final String filetype;
-        String itemName = srcP.getName();
-        int index = itemName.lastIndexOf('.');
-        if (index >= 0) {
-          filetype = itemName.substring(index);
-          name = itemName.substring(0, index);
-        } else {
-          name = itemName;
-          filetype = "";
-        }
+      final SessionState parentSession = SessionState.get();
+      for (final FileStatus srcFile : files) {
+
         futures.add(pool.submit(new Callable<ObjectPair<Path, Path>>() {
           @Override
           public ObjectPair<Path, Path> call() throws Exception {
+            SessionState.setCurrentSessionState(parentSession);
+            final Path srcP = srcFile.getPath();
+            final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs);
+            // Strip off the file type, if any so we don't make:
+            // 000000_0.gz -> 000000_0.gz_copy_1
+            final String name;
+            final String filetype;
+            String itemName = srcP.getName();
+            int index = itemName.lastIndexOf('.');
+            if (index >= 0) {
+              filetype = itemName.substring(index);
+              name = itemName.substring(0, index);
+            } else {
+              name = itemName;
+              filetype = "";
+            }
+
             Path destPath = new Path(destf, srcP.getName());
             if (!needToCopy && !isSrcLocal) {
               for (int counter = 1; !destFs.rename(srcP,destPath); counter++) {
@@ -2671,7 +2674,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
             }
 
             if (inheritPerms) {
-              ShimLoader.getHadoopShims().setFullFileStatus(conf, fullDestStatus, destFs, destf);
+              ShimLoader.getHadoopShims().setFullFileStatus(conf, fullDestStatus, destFs, destPath);
             }
             if (null != newFiles) {
               newFiles.add(destPath);

http://git-wip-us.apache.org/repos/asf/hive/blob/076f3655/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java b/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
index 0fe3169..28d3e48 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/ShimLoader.java
@@ -31,7 +31,7 @@ import java.util.Map;
 public abstract class ShimLoader {
   public static String HADOOP23VERSIONNAME = "0.23";
 
-  private static HadoopShims hadoopShims;
+  private static volatile HadoopShims hadoopShims;
   private static JettyShims jettyShims;
   private static AppenderSkeleton eventCounter;
   private static HadoopThriftAuthBridge hadoopThriftAuthBridge;
@@ -88,9 +88,13 @@ public abstract class ShimLoader {
    * Factory method to get an instance of HadoopShims based on the
    * version of Hadoop on the classpath.
    */
-  public static synchronized HadoopShims getHadoopShims() {
+  public static HadoopShims getHadoopShims() {
     if (hadoopShims == null) {
-      hadoopShims = loadShims(HADOOP_SHIM_CLASSES, HadoopShims.class);
+      synchronized (ShimLoader.class) {
+        if (hadoopShims == null) {
+          hadoopShims = loadShims(HADOOP_SHIM_CLASSES, HadoopShims.class);
+        }
+      }
     }
     return hadoopShims;
   }


[10/11] hive git commit: HIVE-13421 : Propagate job progress in operation status

Posted by jd...@apache.org.
HIVE-13421 : Propagate job progress in operation status


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e1fa0ce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e1fa0ce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e1fa0ce

Branch: refs/heads/llap
Commit: 9e1fa0ce6f2003300640f0bee9b267e33d714084
Parents: 9179178
Author: Rajat Khandelwal <pr...@apache.org>
Authored: Sat Apr 30 08:16:51 2016 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Sat Apr 30 08:16:51 2016 +0530

----------------------------------------------------------------------
 .../service/cli/session/TestQueryDisplay.java    |  4 ++--
 .../java/org/apache/hadoop/hive/ql/Driver.java   | 13 +++++++++++--
 .../org/apache/hadoop/hive/ql/QueryDisplay.java  | 19 +++++++++++++++++--
 .../org/apache/hadoop/hive/ql/QueryPlan.java     | 16 +---------------
 .../hadoop/hive/ql/exec/ConditionalTask.java     |  1 +
 .../org/apache/hadoop/hive/ql/exec/Task.java     | 14 ++++++++++++--
 .../hadoop/hive/ql/exec/mr/ExecDriver.java       |  2 +-
 .../hive/ql/exec/mr/HadoopJobExecHelper.java     |  6 ++++--
 .../apache/hive/service/cli/CLIServiceTest.java  |  4 +++-
 9 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
index 98581e0..cc18ce7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
@@ -154,8 +154,8 @@ public class TestQueryDisplay {
     Assert.assertTrue(qDisplay1.getPerfLogStarts(QueryDisplay.Phase.COMPILATION).size() > 0);
     Assert.assertTrue(qDisplay1.getPerfLogEnds(QueryDisplay.Phase.COMPILATION).size() > 0);
 
-    Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 2);
-    QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(1);
+    Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 1);
+    QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(0);
     Assert.assertEquals(tInfo1.getTaskId(), "Stage-0");
     Assert.assertEquals(tInfo1.getTaskType(), StageType.DDL);
     Assert.assertTrue(tInfo1.getBeginTime() > 0 && tInfo1.getBeginTime() <= System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index dad43fb..32d2cb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -450,7 +450,7 @@ public class Driver implements CommandProcessor {
       schema = getSchema(sem, conf);
 
       plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
-        queryState.getHiveOperation(), schema, queryDisplay);
+        queryState.getHiveOperation(), schema);
 
       conf.setQueryString(queryStr);
 
@@ -1507,7 +1507,7 @@ public class Driver implements CommandProcessor {
           perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
         }
       }
-
+      setQueryDisplays(plan.getRootTasks());
       int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
       int jobs = mrJobs
         + Utilities.getTezTasks(plan.getRootTasks()).size()
@@ -1739,6 +1739,15 @@ public class Driver implements CommandProcessor {
     return (0);
   }
 
+  private void setQueryDisplays(List<Task<? extends Serializable>> tasks) {
+    if (tasks != null) {
+      for (Task<? extends Serializable> task : tasks) {
+        task.setQueryDisplay(queryDisplay);
+        setQueryDisplays(task.getDependentTasks());
+      }
+    }
+  }
+
   private void logMrWarning(int mrJobs) {
     if (mrJobs <= 0 || !("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE)))) {
       return;

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
index d582bc0..703e997 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
@@ -79,6 +79,8 @@ public class QueryDisplay {
     private String name;
     private boolean requireLock;
     private boolean retryIfFail;
+    private String statusMessage;
+
     // required for jackson
     public TaskDisplay() {
 
@@ -158,15 +160,28 @@ public class QueryDisplay {
       if (externalHandle == null && tTask.getExternalHandle() != null) {
         this.externalHandle = tTask.getExternalHandle();
       }
+      setStatusMessage(tTask.getStatusMessage());
       switch (taskState) {
         case RUNNING:
-          beginTime = System.currentTimeMillis();
+          if (beginTime == null) {
+            beginTime = System.currentTimeMillis();
+          }
           break;
         case FINISHED:
-          endTime = System.currentTimeMillis();
+          if (endTime == null) {
+            endTime = System.currentTimeMillis();
+          }
           break;
       }
     }
+
+    public synchronized String getStatusMessage() {
+      return statusMessage;
+    }
+
+    public synchronized void setStatusMessage(String statusMessage) {
+      this.statusMessage = statusMessage;
+    }
   }
   public synchronized void setTaskResult(String taskId, TaskResult result) {
     TaskDisplay taskDisplay = tasks.get(taskId);

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index ef0923d..e8c8ae6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -113,26 +113,12 @@ public class QueryPlan implements Serializable {
   }
 
   public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
-                   HiveOperation operation, Schema resultSchema) {
-    this(queryString, sem, startTime, queryId, operation, resultSchema, null);
-  }
-  public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
-                  HiveOperation operation, Schema resultSchema, QueryDisplay queryDisplay) {
+                  HiveOperation operation, Schema resultSchema) {
     this.queryString = queryString;
 
     rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks());
     reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
     fetchTask = sem.getFetchTask();
-    if (queryDisplay != null) {
-      if (fetchTask != null) {
-        fetchTask.setQueryDisplay(queryDisplay);
-      }
-      if (rootTasks!= null) {
-        for (Task t : rootTasks) {
-          t.setQueryDisplay(queryDisplay);
-        }
-      }
-    }
     // Note that inputs and outputs can be changed when the query gets executed
     inputs = sem.getAllInputs();
     outputs = sem.getAllOutputs();

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
index c96c813..52cb445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
@@ -200,6 +200,7 @@ public class ConditionalTask extends Task<ConditionalWork> implements Serializab
   public boolean addDependentTask(Task<? extends Serializable> dependent) {
     boolean ret = false;
     if (getListTasks() != null) {
+      ret = true;
       for (Task<? extends Serializable> tsk : getListTasks()) {
         ret = ret & tsk.addDependentTask(dependent);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 897af5e..34bdafd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -83,8 +83,18 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
   protected String id;
   protected T work;
   private TaskState taskState = TaskState.CREATED;
+  private String statusMessage;
   private transient boolean fetchSource;
 
+  public void setStatusMessage(String statusMessage) {
+    this.statusMessage = statusMessage;
+    updateStatusInQueryDisplay();
+  }
+
+  public String getStatusMessage() {
+    return statusMessage;
+  }
+
   public enum FeedType {
     DYNAMIC_PARTITIONS, // list of dynamic partitions
   }
@@ -138,13 +148,13 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
     this.queryDisplay = queryDisplay;
   }
 
-  private void updateStatusInQueryDisplay() {
+  protected void updateStatusInQueryDisplay() {
     if (queryDisplay != null) {
       queryDisplay.updateTaskStatus(this);
     }
   }
 
-  private void setState(TaskState state) {
+  protected void setState(TaskState state) {
     this.taskState = state;
     updateStatusInQueryDisplay();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 639b0da..926f6e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -432,7 +432,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
       this.jobID = rj.getJobID();
-
+      updateStatusInQueryDisplay();
       returnVal = jobExecHelper.progress(rj, jc);
       success = (returnVal == 0);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 760ba6c..11f5cfd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -367,12 +367,14 @@ public class HadoopJobExecHelper {
         }
       }
       console.printInfo(output);
+      task.setStatusMessage(output);
       reportTime = System.currentTimeMillis();
     }
 
     if (cpuMsec > 0) {
-      console.printInfo("MapReduce Total cumulative CPU time: "
-          + Utilities.formatMsecToStr(cpuMsec));
+      String status = "MapReduce Total cumulative CPU time: " + Utilities.formatMsecToStr(cpuMsec);
+      console.printInfo(status);
+      task.setStatusMessage(status);
     }
 
     boolean success;

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index ff7e9a4..fb8ee4c 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -641,7 +641,8 @@ public abstract class CLIServiceTest {
     SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
     assertNotNull(sessionHandle);
     // nonblocking execute
-    String select = "SELECT ID + ' ' FROM TEST_EXEC_ASYNC";
+    String select = "select a.id, b.id from (SELECT ID + ' ' `ID` FROM TEST_EXEC_ASYNC) a full outer join "
+      + "(SELECT ID + ' ' `ID` FROM TEST_EXEC_ASYNC) b on a.ID=b.ID";
     OperationHandle ophandle =
       client.executeStatementAsync(sessionHandle, select, confOverlay);
 
@@ -697,6 +698,7 @@ public abstract class CLIServiceTest {
         case FINISHED:
           if (taskDisplay.getTaskType() == StageType.MAPRED || taskDisplay.getTaskType() == StageType.MAPREDLOCAL) {
             assertNotNull(taskDisplay.getExternalHandle());
+            assertNotNull(taskDisplay.getStatusMessage());
           }
           assertNotNull(taskDisplay.getBeginTime());
           assertNotNull(taskDisplay.getEndTime());


[02/11] hive git commit: HIVE-13509: HCatalog getSplits should ignore the partition with invalid path (Chaoyu Tang, reviewed by Szehon Ho and Mithun Radhakrishnan)

Posted by jd...@apache.org.
HIVE-13509: HCatalog getSplits should ignore the partition with invalid path (Chaoyu Tang, reviewed by Szehon Ho and Mithun Radhakrishnan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4377c7ff
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4377c7ff
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4377c7ff

Branch: refs/heads/llap
Commit: 4377c7ff03c93f0b1c69305a1b8852f579a3130b
Parents: ba864a2
Author: ctang <ct...@cloudera.com>
Authored: Thu Apr 28 22:10:59 2016 -0400
Committer: ctang <ct...@cloudera.com>
Committed: Thu Apr 28 22:10:59 2016 -0400

----------------------------------------------------------------------
 HIVE-13509.2.patch                              | 478 +++++++++++++++++++
 .../hive/hcatalog/common/HCatConstants.java     |   3 +
 .../hcatalog/mapreduce/HCatBaseInputFormat.java |  29 +-
 .../hive/hcatalog/pig/TestHCatLoader.java       |  55 +++
 4 files changed, 559 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/HIVE-13509.2.patch
----------------------------------------------------------------------
diff --git a/HIVE-13509.2.patch b/HIVE-13509.2.patch
new file mode 100644
index 0000000..930b1f7
--- /dev/null
+++ b/HIVE-13509.2.patch
@@ -0,0 +1,478 @@
+diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+index 6b03fcb..d165e7e 100644
+--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
++++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+@@ -208,4 +208,7 @@ private HCatConstants() { // restrict instantiation
+    */
+   public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
+   public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
++
++  public static final String HCAT_INPUT_IGNORE_INVALID_PATH_KEY = "hcat.input.ignore.invalid.path";
++  public static final boolean HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT = false;
+ }
+diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+index adfaf4e..dbbdd61 100644
+--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
++++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+@@ -21,11 +21,11 @@
+ 
+ import java.io.IOException;
+ import java.util.ArrayList;
++import java.util.Iterator;
+ import java.util.LinkedList;
+ import java.util.Map;
+ import java.util.HashMap;
+ import java.util.List;
+-
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+@@ -127,7 +127,10 @@ public static void setOutputSchema(Job job, HCatSchema hcatSchema)
+     //For each matching partition, call getSplits on the underlying InputFormat
+     for (PartInfo partitionInfo : partitionInfoList) {
+       jobConf = HCatUtil.getJobConfFromContext(jobContext);
+-      setInputPath(jobConf, partitionInfo.getLocation());
++      List<String> setInputPath = setInputPath(jobConf, partitionInfo.getLocation());
++      if (setInputPath.isEmpty()) {
++        continue;
++      }
+       Map<String, String> jobProperties = partitionInfo.getJobProperties();
+ 
+       HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+@@ -281,7 +284,7 @@ private static InputJobInfo getJobInfo(Configuration conf)
+     return (InputJobInfo) HCatUtil.deserialize(jobString);
+   }
+ 
+-  private void setInputPath(JobConf jobConf, String location)
++  private List<String> setInputPath(JobConf jobConf, String location)
+     throws IOException {
+ 
+     // ideally we should just call FileInputFormat.setInputPaths() here - but
+@@ -322,19 +325,33 @@ private void setInputPath(JobConf jobConf, String location)
+     }
+     pathStrings.add(location.substring(pathStart, length));
+ 
+-    Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
+     String separator = "";
+     StringBuilder str = new StringBuilder();
+ 
+-    for (Path path : paths) {
++    boolean ignoreInvalidPath =jobConf.getBoolean(HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_KEY,
++        HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT);
++    Iterator<String> pathIterator = pathStrings.iterator();
++    while (pathIterator.hasNext()) {
++      String pathString = pathIterator.next();
++      if (ignoreInvalidPath && org.apache.commons.lang.StringUtils.isBlank(pathString)) {
++        continue;
++      }
++      Path path = new Path(pathString);
+       FileSystem fs = path.getFileSystem(jobConf);
++      if (ignoreInvalidPath && !fs.exists(path)) {
++        pathIterator.remove();
++        continue;
++      }
+       final String qualifiedPath = fs.makeQualified(path).toString();
+       str.append(separator)
+         .append(StringUtils.escapeString(qualifiedPath));
+       separator = StringUtils.COMMA_STR;
+     }
+ 
+-    jobConf.set("mapred.input.dir", str.toString());
++    if (!ignoreInvalidPath || !pathStrings.isEmpty()) {
++      jobConf.set("mapred.input.dir", str.toString());
++    }
++    return pathStrings;
+   }
+ 
+ }
+diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+index 2440cb5..4e23fa2 100644
+--- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
++++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+@@ -66,6 +66,7 @@
+ import org.apache.pig.data.Tuple;
+ import org.apache.pig.impl.logicalLayer.schema.Schema;
+ import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
++import org.apache.pig.impl.util.PropertiesUtil;
+ import org.joda.time.DateTime;
+ import org.junit.After;
+ import org.junit.Before;
+@@ -102,6 +103,7 @@
+           add("testReadPartitionedBasic");
+           add("testProjectionsBasic");
+           add("testColumnarStorePushdown2");
++          add("testReadMissingPartitionBasicNeg");
+         }});
+       }};
+ 
+@@ -438,6 +440,59 @@ public void testReadPartitionedBasic() throws IOException, CommandNeedRetryExcep
+   }
+ 
+   @Test
++  public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException {
++    assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
++    PigServer server = new PigServer(ExecType.LOCAL);
++
++    File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
++    if (!removeDirectory(removedPartitionDir)) {
++      System.out.println("Test did not run because its environment could not be set.");
++      return;
++    }
++    driver.run("select * from " + PARTITIONED_TABLE);
++    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
++    driver.getResults(valuesReadFromHiveDriver);
++    assertTrue(valuesReadFromHiveDriver.size() == 6);
++
++    server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++    Schema dumpedWSchema = server.dumpSchema("W");
++    List<FieldSchema> Wfields = dumpedWSchema.getFields();
++    assertEquals(3, Wfields.size());
++    assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
++    assertTrue(Wfields.get(0).type == DataType.INTEGER);
++    assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
++    assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
++    assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
++    assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
++
++    try {
++      Iterator<Tuple> WIter = server.openIterator("W");
++      fail("Should failed in retriving an invalid partition");
++    } catch (IOException ioe) {
++      // expected
++    }
++  }
++
++  private static boolean removeDirectory(File dir) {
++    boolean success = false;
++    if (dir.isDirectory()) {
++      File[] files = dir.listFiles();
++      if (files != null && files.length > 0) {
++        for (File file : files) {
++          success = removeDirectory(file);
++          if (!success) {
++            return false;
++          }
++        }
++      }
++      success = dir.delete();
++    } else {
++        success = dir.delete();
++    }
++    return success;
++  }
++
++  @Test
+   public void testProjectionsBasic() throws IOException {
+     assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
+ 
+diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java
+new file mode 100644
+index 0000000..41fe79b
+--- /dev/null
++++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderWithProps.java
+@@ -0,0 +1,305 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++package org.apache.hive.hcatalog.pig;
++
++import java.io.File;
++import java.io.FileWriter;
++import java.io.IOException;
++import java.io.PrintWriter;
++import java.io.RandomAccessFile;
++import java.sql.Date;
++import java.sql.Timestamp;
++import java.util.ArrayList;
++import java.util.Collection;
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Map;
++import java.util.Properties;
++import java.util.Set;
++
++import org.apache.commons.io.FileUtils;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.FileUtil;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.hive.cli.CliSessionState;
++import org.apache.hadoop.hive.conf.HiveConf;
++import org.apache.hadoop.hive.ql.CommandNeedRetryException;
++import org.apache.hadoop.hive.ql.Driver;
++import org.apache.hadoop.hive.ql.WindowsPathUtil;
++import org.apache.hadoop.hive.ql.io.IOConstants;
++import org.apache.hadoop.hive.ql.io.StorageFormats;
++import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
++import org.apache.hadoop.hive.ql.session.SessionState;
++import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
++import org.apache.hadoop.mapreduce.Job;
++import org.apache.hadoop.util.Shell;
++import org.apache.hive.hcatalog.HcatTestUtils;
++import org.apache.hive.hcatalog.common.HCatUtil;
++import org.apache.hive.hcatalog.common.HCatConstants;
++import org.apache.hive.hcatalog.data.Pair;
++import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
++import org.apache.pig.ExecType;
++import org.apache.pig.PigRunner;
++import org.apache.pig.PigServer;
++import org.apache.pig.ResourceStatistics;
++import org.apache.pig.tools.pigstats.OutputStats;
++import org.apache.pig.tools.pigstats.PigStats;
++import org.apache.pig.data.DataType;
++import org.apache.pig.data.Tuple;
++import org.apache.pig.impl.logicalLayer.schema.Schema;
++import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
++import org.apache.pig.impl.util.PropertiesUtil;
++import org.joda.time.DateTime;
++import org.junit.After;
++import org.junit.Before;
++import org.junit.Test;
++import org.junit.runner.RunWith;
++import org.junit.runners.Parameterized;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import static org.junit.Assert.*;
++import static org.junit.Assume.assumeTrue;
++
++@RunWith(Parameterized.class)
++public class TestHCatLoaderWithProps {
++  private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoaderWithProps.class);
++  private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") +
++      File.separator + TestHCatLoaderWithProps.class.getCanonicalName() + "-" + System.currentTimeMillis());
++  private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
++  private static final String BASIC_FILE_NAME = TEST_DATA_DIR + "/basic.input.data";
++
++  private static final String BASIC_TABLE = "junit_unparted_basic";
++  private static final String PARTITIONED_TABLE = "junit_parted_basic";
++
++  private Driver driver;
++  private Map<Integer, Pair<Integer, String>> basicInputData;
++
++  private static final Map<String, Set<String>> DISABLED_STORAGE_FORMATS =
++      new HashMap<String, Set<String>>() {{
++        put(IOConstants.PARQUETFILE, new HashSet<String>() {{
++          add("testReadMissingPartitionBasic");
++        }});
++      }};
++
++  private final String storageFormat;
++
++  @Parameterized.Parameters
++  public static Collection<Object[]> generateParameters() {
++    return StorageFormats.names();
++  }
++
++  public TestHCatLoaderWithProps(String storageFormat) {
++    this.storageFormat = storageFormat;
++  }
++
++  private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
++    dropTable(tablename, driver);
++  }
++
++  static void dropTable(String tablename, Driver driver) throws IOException, CommandNeedRetryException {
++    driver.run("drop table if exists " + tablename);
++  }
++
++  private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
++    createTable(tablename, schema, partitionedBy, driver, storageFormat);
++  }
++
++  static void createTable(String tablename, String schema, String partitionedBy, Driver driver, String storageFormat)
++      throws IOException, CommandNeedRetryException {
++    String createTable;
++    createTable = "create table " + tablename + "(" + schema + ") ";
++    if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
++      createTable = createTable + "partitioned by (" + partitionedBy + ") ";
++    }
++    createTable = createTable + "stored as " +storageFormat;
++    executeStatementOnDriver(createTable, driver);
++  }
++
++  private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
++    createTable(tablename, schema, null);
++  }
++
++  /**
++   * Execute Hive CLI statement
++   * @param cmd arbitrary statement to execute
++   */
++  static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException {
++    LOG.debug("Executing: " + cmd);
++    CommandProcessorResponse cpr = driver.run(cmd);
++    if(cpr.getResponseCode() != 0) {
++      throw new IOException("Failed to execute \"" + cmd + "\". Driver returned " + cpr.getResponseCode() + " Error: " + cpr.getErrorMessage());
++    }
++  }
++
++  @Before
++  public void setup() throws Exception {
++    File f = new File(TEST_WAREHOUSE_DIR);
++    if (f.exists()) {
++      FileUtil.fullyDelete(f);
++    }
++    if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
++      throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
++    }
++
++    HiveConf hiveConf = new HiveConf(this.getClass());
++    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
++    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
++    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
++    hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR);
++    hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
++
++    if (Shell.WINDOWS) {
++      WindowsPathUtil.convertPathsFromWindowsToHdfs(hiveConf);
++    }
++
++    driver = new Driver(hiveConf);
++    SessionState.start(new CliSessionState(hiveConf));
++
++    createTable(BASIC_TABLE, "a int, b string");
++    createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");
++
++    int LOOP_SIZE = 3;
++    String[] input = new String[LOOP_SIZE * LOOP_SIZE];
++    basicInputData = new HashMap<Integer, Pair<Integer, String>>();
++    int k = 0;
++    for (int i = 1; i <= LOOP_SIZE; i++) {
++      String si = i + "";
++      for (int j = 1; j <= LOOP_SIZE; j++) {
++        String sj = "S" + j + "S";
++        input[k] = si + "\t" + sj;
++        basicInputData.put(k, new Pair<Integer, String>(i, sj));
++        k++;
++      }
++    }
++    HcatTestUtils.createTestDataFile(BASIC_FILE_NAME, input);
++
++    PigServer server = new PigServer(ExecType.LOCAL);
++    server.setBatchOn();
++    int i = 0;
++    server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);", ++i);
++
++    server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
++    server.registerQuery("B = foreach A generate a,b;", ++i);
++    server.registerQuery("B2 = filter B by a < 2;", ++i);
++    server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=0');", ++i);
++
++    server.registerQuery("C = foreach A generate a,b;", ++i);
++    server.registerQuery("C2 = filter C by a >= 2;", ++i);
++    server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=1');", ++i);
++
++    server.executeBatch();
++  }
++
++  @After
++  public void tearDown() throws Exception {
++    try {
++      if (driver != null) {
++        dropTable(BASIC_TABLE);
++        dropTable(PARTITIONED_TABLE);
++      }
++    } finally {
++      FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
++    }
++  }
++
++  @Test
++  public void testReadMissingPartitionBasic() throws IOException, CommandNeedRetryException {
++    assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
++    Properties pigProperties = PropertiesUtil.loadDefaultProperties();
++    pigProperties.setProperty("hcat.input.ignore.invalid.path", "true");
++    PigServer server = new PigServer(ExecType.LOCAL, pigProperties);
++
++    File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
++    if (!removeDirectory(removedPartitionDir)) {
++      System.out.println("Test did not run because its environment could not be set.");
++      return;
++    }
++    driver.run("select * from " + PARTITIONED_TABLE);
++    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
++    driver.getResults(valuesReadFromHiveDriver);
++    assertTrue(valuesReadFromHiveDriver.size() == 6);
++
++    server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++    Schema dumpedWSchema = server.dumpSchema("W");
++    List<FieldSchema> Wfields = dumpedWSchema.getFields();
++    assertEquals(3, Wfields.size());
++    assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
++    assertTrue(Wfields.get(0).type == DataType.INTEGER);
++    assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
++    assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
++    assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
++    assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
++
++    Iterator<Tuple> WIter = server.openIterator("W");
++    Collection<Pair<Integer, String>> valuesRead = new ArrayList<Pair<Integer, String>>();
++    while (WIter.hasNext()) {
++      Tuple t = WIter.next();
++      assertTrue(t.size() == 3);
++      assertNotNull(t.get(0));
++      assertNotNull(t.get(1));
++      assertNotNull(t.get(2));
++      assertTrue(t.get(0).getClass() == Integer.class);
++      assertTrue(t.get(1).getClass() == String.class);
++      assertTrue(t.get(2).getClass() == String.class);
++      valuesRead.add(new Pair<Integer, String>((Integer) t.get(0), (String) t.get(1)));
++      // the returned partition value is always 1
++      assertEquals("1", t.get(2));
++    }
++    assertEquals(valuesReadFromHiveDriver.size(), valuesRead.size());
++
++    server.registerQuery("P1 = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++    server.registerQuery("P1filter = filter P1 by bkt == '0';");
++    Iterator<Tuple> P1Iter = server.openIterator("P1filter");
++    assertFalse(P1Iter.hasNext());
++
++    server.registerQuery("P2 = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
++    server.registerQuery("P2filter = filter P2 by bkt == '1';");
++    Iterator<Tuple> P2Iter = server.openIterator("P2filter");
++    int count2 = 0;
++    while (P2Iter.hasNext()) {
++      Tuple t = P2Iter.next();
++      assertEquals("1", t.get(2));
++      assertTrue(((Integer) t.get(0)) > 1);
++      count2++;
++    }
++    assertEquals(6, count2);
++  }
++
++  private static boolean removeDirectory(File dir) {
++    boolean success = false;
++    if (dir.isDirectory()) {
++      File[] files = dir.listFiles();
++      if (files != null && files.length > 0) {
++        for (File file : files) {
++          success = removeDirectory(file);
++          if (!success) {
++            return false;
++          }
++        }
++      }
++      success = dir.delete();
++    } else {
++        success = dir.delete();
++    }
++    return success;
++  }
++}

http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
index 6b03fcb..d165e7e 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
@@ -208,4 +208,7 @@ public final class HCatConstants {
    */
   public static final String HCAT_INPUT_BAD_RECORD_MIN_KEY = "hcat.input.bad.record.min";
   public static final int HCAT_INPUT_BAD_RECORD_MIN_DEFAULT = 2;
+
+  public static final String HCAT_INPUT_IGNORE_INVALID_PATH_KEY = "hcat.input.ignore.invalid.path";
+  public static final boolean HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT = false;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
index adfaf4e..dbbdd61 100644
--- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
+++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatBaseInputFormat.java
@@ -21,11 +21,11 @@ package org.apache.hive.hcatalog.mapreduce;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -127,7 +127,10 @@ public abstract class HCatBaseInputFormat
     //For each matching partition, call getSplits on the underlying InputFormat
     for (PartInfo partitionInfo : partitionInfoList) {
       jobConf = HCatUtil.getJobConfFromContext(jobContext);
-      setInputPath(jobConf, partitionInfo.getLocation());
+      List<String> setInputPath = setInputPath(jobConf, partitionInfo.getLocation());
+      if (setInputPath.isEmpty()) {
+        continue;
+      }
       Map<String, String> jobProperties = partitionInfo.getJobProperties();
 
       HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
@@ -281,7 +284,7 @@ public abstract class HCatBaseInputFormat
     return (InputJobInfo) HCatUtil.deserialize(jobString);
   }
 
-  private void setInputPath(JobConf jobConf, String location)
+  private List<String> setInputPath(JobConf jobConf, String location)
     throws IOException {
 
     // ideally we should just call FileInputFormat.setInputPaths() here - but
@@ -322,19 +325,33 @@ public abstract class HCatBaseInputFormat
     }
     pathStrings.add(location.substring(pathStart, length));
 
-    Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
     String separator = "";
     StringBuilder str = new StringBuilder();
 
-    for (Path path : paths) {
+    boolean ignoreInvalidPath =jobConf.getBoolean(HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_KEY,
+        HCatConstants.HCAT_INPUT_IGNORE_INVALID_PATH_DEFAULT);
+    Iterator<String> pathIterator = pathStrings.iterator();
+    while (pathIterator.hasNext()) {
+      String pathString = pathIterator.next();
+      if (ignoreInvalidPath && org.apache.commons.lang.StringUtils.isBlank(pathString)) {
+        continue;
+      }
+      Path path = new Path(pathString);
       FileSystem fs = path.getFileSystem(jobConf);
+      if (ignoreInvalidPath && !fs.exists(path)) {
+        pathIterator.remove();
+        continue;
+      }
       final String qualifiedPath = fs.makeQualified(path).toString();
       str.append(separator)
         .append(StringUtils.escapeString(qualifiedPath));
       separator = StringUtils.COMMA_STR;
     }
 
-    jobConf.set("mapred.input.dir", str.toString());
+    if (!ignoreInvalidPath || !pathStrings.isEmpty()) {
+      jobConf.set("mapred.input.dir", str.toString());
+    }
+    return pathStrings;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4377c7ff/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
----------------------------------------------------------------------
diff --git a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
index 2440cb5..4e23fa2 100644
--- a/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
+++ b/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
@@ -66,6 +66,7 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.PropertiesUtil;
 import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Before;
@@ -102,6 +103,7 @@ public class TestHCatLoader {
           add("testReadPartitionedBasic");
           add("testProjectionsBasic");
           add("testColumnarStorePushdown2");
+          add("testReadMissingPartitionBasicNeg");
         }});
       }};
 
@@ -438,6 +440,59 @@ public class TestHCatLoader {
   }
 
   @Test
+  public void testReadMissingPartitionBasicNeg() throws IOException, CommandNeedRetryException {
+    assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
+    PigServer server = new PigServer(ExecType.LOCAL);
+
+    File removedPartitionDir = new File(TEST_WAREHOUSE_DIR + "/" + PARTITIONED_TABLE + "/bkt=0");
+    if (!removeDirectory(removedPartitionDir)) {
+      System.out.println("Test did not run because its environment could not be set.");
+      return;
+    }
+    driver.run("select * from " + PARTITIONED_TABLE);
+    ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>();
+    driver.getResults(valuesReadFromHiveDriver);
+    assertTrue(valuesReadFromHiveDriver.size() == 6);
+
+    server.registerQuery("W = load '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatLoader();");
+    Schema dumpedWSchema = server.dumpSchema("W");
+    List<FieldSchema> Wfields = dumpedWSchema.getFields();
+    assertEquals(3, Wfields.size());
+    assertTrue(Wfields.get(0).alias.equalsIgnoreCase("a"));
+    assertTrue(Wfields.get(0).type == DataType.INTEGER);
+    assertTrue(Wfields.get(1).alias.equalsIgnoreCase("b"));
+    assertTrue(Wfields.get(1).type == DataType.CHARARRAY);
+    assertTrue(Wfields.get(2).alias.equalsIgnoreCase("bkt"));
+    assertTrue(Wfields.get(2).type == DataType.CHARARRAY);
+
+    try {
+      Iterator<Tuple> WIter = server.openIterator("W");
+      fail("Should failed in retriving an invalid partition");
+    } catch (IOException ioe) {
+      // expected
+    }
+  }
+
+  private static boolean removeDirectory(File dir) {
+    boolean success = false;
+    if (dir.isDirectory()) {
+      File[] files = dir.listFiles();
+      if (files != null && files.length > 0) {
+        for (File file : files) {
+          success = removeDirectory(file);
+          if (!success) {
+            return false;
+          }
+        }
+      }
+      success = dir.delete();
+    } else {
+        success = dir.delete();
+    }
+    return success;
+  }
+
+  @Test
   public void testProjectionsBasic() throws IOException {
     assumeTrue(!TestUtil.shouldSkip(storageFormat, DISABLED_STORAGE_FORMATS));
 


[07/11] hive git commit: HIVE-13596 : HS2 should be able to get UDFs on demand from metastore (Sergey Shelukin, reviewed by Jason Dere)

Posted by jd...@apache.org.
HIVE-13596 : HS2 should be able to get UDFs on demand from metastore (Sergey Shelukin, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/6460529a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/6460529a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/6460529a

Branch: refs/heads/llap
Commit: 6460529aa910d7dd4af499d4adf85cbcf67452ce
Parents: 1289aff
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 29 11:01:34 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 29 11:01:34 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../apache/hadoop/hive/ql/exec/Registry.java    | 323 ++++++++++++-------
 .../hadoop/hive/ql/session/SessionState.java    |   3 +-
 3 files changed, 218 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index db4b9e8..46a3b96 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2322,6 +2322,9 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_BUILTIN_UDF_BLACKLIST("hive.server2.builtin.udf.blacklist", "",
          "Comma separated list of udfs names. These udfs will not be allowed in queries." +
          " The udf black list takes precedence over udf white list"),
+     HIVE_ALLOW_UDF_LOAD_ON_DEMAND("hive.allow.udf.load.on.demand", false,
+         "Whether enable loading UDFs from metastore on demand; this is mostly relevant for\n" +
+         "HS2 and was the default behavior before Hive 1.2. Off by default."),
 
     HIVE_SERVER2_SESSION_CHECK_INTERVAL("hive.server2.session.check.interval", "6h",
         new TimeValidator(TimeUnit.MILLISECONDS, 3000l, true, null, false),

http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
index d5f4a37..3b54b49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Registry.java
@@ -24,8 +24,12 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -46,7 +50,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import java.io.IOException;
-import java.net.URLClassLoader;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -56,6 +59,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
@@ -80,14 +84,16 @@ public class Registry {
   private final Set<ClassLoader> mSessionUDFLoaders = new LinkedHashSet<ClassLoader>();
 
   private final boolean isNative;
+  /**
+   * The epic lock for the registry. This was added to replace the synchronized methods with
+   * minimum disruption; the locking should really be made more granular here.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
 
-  Registry(boolean isNative) {
+  public Registry(boolean isNative) {
     this.isNative = isNative;
   }
 
-  public Registry() {
-    this(false);
-  }
 
   /**
    * Registers the appropriate kind of temporary function based on a class's
@@ -157,11 +163,16 @@ public class Registry {
    * Registers the UDF class as a built-in function; used for dynamically created UDFs, like
    * GenericUDFOP*Minus/Plus.
    */
-  public synchronized void registerHiddenBuiltIn(Class<? extends GenericUDF> functionClass) {
-    if (!isNative) {
-      throw new RuntimeException("Builtin is not for this registry");
+  public void registerHiddenBuiltIn(Class<? extends GenericUDF> functionClass) {
+    lock.lock();
+    try {
+      if (!isNative) {
+        throw new RuntimeException("Builtin is not for this registry");
+      }
+      builtIns.add(functionClass);
+    } finally {
+      lock.unlock();
     }
-    builtIns.add(functionClass);
   }
 
   public FunctionInfo registerGenericUDTF(String functionName,
@@ -256,23 +267,29 @@ public class Registry {
    * @param functionName
    * @return
    */
-  public synchronized FunctionInfo getFunctionInfo(String functionName) throws SemanticException {
-    functionName = functionName.toLowerCase();
-    if (FunctionUtils.isQualifiedFunctionName(functionName)) {
-      return getQualifiedFunctionInfo(functionName);
-    }
-    // First try without qualifiers - would resolve builtin/temp functions.
-    // Otherwise try qualifying with current db name.
-    FunctionInfo functionInfo = mFunctions.get(functionName);
-    if (functionInfo != null && functionInfo.isBlockedFunction()) {
-      throw new SemanticException ("UDF " + functionName + " is not allowed");
-    }
-    if (functionInfo == null) {
-      String qualifiedName = FunctionUtils.qualifyFunctionName(
-          functionName, SessionState.get().getCurrentDatabase().toLowerCase());
-      functionInfo = getQualifiedFunctionInfo(qualifiedName);
-    }
+  public FunctionInfo getFunctionInfo(String functionName) throws SemanticException {
+    lock.lock();
+    try {
+      functionName = functionName.toLowerCase();
+      if (FunctionUtils.isQualifiedFunctionName(functionName)) {
+        return getQualifiedFunctionInfoUnderLock(functionName);
+      }
+      // First try without qualifiers - would resolve builtin/temp functions.
+      // Otherwise try qualifying with current db name.
+      FunctionInfo functionInfo = mFunctions.get(functionName);
+      if (functionInfo != null && functionInfo.isBlockedFunction()) {
+        throw new SemanticException ("UDF " + functionName + " is not allowed");
+      }
+      if (functionInfo == null) {
+        String qualifiedName = FunctionUtils.qualifyFunctionName(
+            functionName, SessionState.get().getCurrentDatabase().toLowerCase());
+        functionInfo = getQualifiedFunctionInfoUnderLock(qualifiedName);
+      }
     return functionInfo;
+    } finally {
+      lock.unlock();
+    }
+
   }
 
   public WindowFunctionInfo getWindowFunctionInfo(String functionName) throws SemanticException {
@@ -295,15 +312,23 @@ public class Registry {
     return udfClass != null && persistent.containsKey(udfClass);
   }
 
-  public synchronized Set<String> getCurrentFunctionNames() {
-    return getFunctionNames((Pattern)null);
+  public Set<String> getCurrentFunctionNames() {
+    lock.lock();
+    try {
+      return getFunctionNames((Pattern)null);
+    } finally {
+      lock.unlock();
+    }
   }
 
-  public synchronized Set<String> getFunctionNames(String funcPatternStr) {
+  public Set<String> getFunctionNames(String funcPatternStr) {
+    lock.lock();
     try {
       return getFunctionNames(Pattern.compile(funcPatternStr));
     } catch (PatternSyntaxException e) {
       return Collections.emptySet();
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -315,17 +340,22 @@ public class Registry {
    * @param funcPattern regular expression of the interested function names
    * @return set of strings contains function names
    */
-  public synchronized Set<String> getFunctionNames(Pattern funcPattern) {
-    Set<String> funcNames = new TreeSet<String>();
-    for (String funcName : mFunctions.keySet()) {
-      if (funcName.contains(WINDOW_FUNC_PREFIX)) {
-        continue;
-      }
-      if (funcPattern == null || funcPattern.matcher(funcName).matches()) {
-        funcNames.add(funcName);
+  public Set<String> getFunctionNames(Pattern funcPattern) {
+    lock.lock();
+    try {
+      Set<String> funcNames = new TreeSet<String>();
+      for (String funcName : mFunctions.keySet()) {
+        if (funcName.contains(WINDOW_FUNC_PREFIX)) {
+          continue;
+        }
+        if (funcPattern == null || funcPattern.matcher(funcName).matches()) {
+          funcNames.add(funcName);
+        }
       }
+      return funcNames;
+    } finally {
+      lock.unlock();
     }
-    return funcNames;
   }
 
   /**
@@ -334,18 +364,23 @@ public class Registry {
    * @param funcInfo
    * @param synonyms
    */
-  public synchronized void getFunctionSynonyms(
+  public void getFunctionSynonyms(
       String funcName, FunctionInfo funcInfo, Set<String> synonyms) throws SemanticException {
-    Class<?> funcClass = funcInfo.getFunctionClass();
-    for (Map.Entry<String, FunctionInfo> entry : mFunctions.entrySet()) {
-      String name = entry.getKey();
-      if (name.contains(WINDOW_FUNC_PREFIX) || name.equals(funcName)) {
-        continue;
-      }
-      FunctionInfo function = entry.getValue();
-      if (function.getFunctionClass() == funcClass) {
-        synonyms.add(name);
+    lock.lock();
+    try {
+      Class<?> funcClass = funcInfo.getFunctionClass();
+      for (Map.Entry<String, FunctionInfo> entry : mFunctions.entrySet()) {
+        String name = entry.getKey();
+        if (name.contains(WINDOW_FUNC_PREFIX) || name.equals(funcName)) {
+          continue;
+        }
+        FunctionInfo function = entry.getValue();
+        if (function.getFunctionClass() == funcClass) {
+          synonyms.add(name);
+        }
       }
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -409,26 +444,31 @@ public class Registry {
     return ((GenericUDAFResolver2) udafResolver).getEvaluator(paramInfo);
   }
 
-  private synchronized void addFunction(String functionName, FunctionInfo function) {
-    if (isNative ^ function.isNative()) {
-      throw new RuntimeException("Function " + functionName + " is not for this registry");
-    }
-    functionName = functionName.toLowerCase();
-    FunctionInfo prev = mFunctions.get(functionName);
-    if (prev != null) {
-      if (isBuiltInFunc(prev.getFunctionClass())) {
-        throw new RuntimeException("Function " + functionName + " is hive builtin function, " +
-            "which cannot be overriden.");
+  private void addFunction(String functionName, FunctionInfo function) {
+    lock.lock();
+    try {
+      if (isNative != function.isNative()) {
+        throw new RuntimeException("Function " + functionName + " is not for this registry");
       }
-      prev.discarded();
-    }
-    mFunctions.put(functionName, function);
-    if (function.isBuiltIn()) {
-      builtIns.add(function.getFunctionClass());
-    } else if (function.isPersistent()) {
-      Class<?> functionClass = getPermanentUdfClass(function);
-      Integer refCount = persistent.get(functionClass);
-      persistent.put(functionClass, Integer.valueOf(refCount == null ? 1 : refCount + 1));
+      functionName = functionName.toLowerCase();
+      FunctionInfo prev = mFunctions.get(functionName);
+      if (prev != null) {
+        if (isBuiltInFunc(prev.getFunctionClass())) {
+          throw new RuntimeException("Function " + functionName + " is hive builtin function, " +
+              "which cannot be overriden.");
+        }
+        prev.discarded();
+      }
+      mFunctions.put(functionName, function);
+      if (function.isBuiltIn()) {
+        builtIns.add(function.getFunctionClass());
+      } else if (function.isPersistent()) {
+        Class<?> functionClass = getPermanentUdfClass(function);
+        Integer refCount = persistent.get(functionClass);
+        persistent.put(functionClass, Integer.valueOf(refCount == null ? 1 : refCount + 1));
+      }
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -446,23 +486,27 @@ public class Registry {
     return functionClass;
   }
 
-  public synchronized void unregisterFunction(String functionName) throws HiveException {
-    functionName = functionName.toLowerCase();
-    FunctionInfo fi = mFunctions.get(functionName);
-    if (fi != null) {
-      if (fi.isBuiltIn()) {
-        throw new HiveException(ErrorMsg.DROP_NATIVE_FUNCTION.getMsg(functionName));
-      }
-      mFunctions.remove(functionName);
-      fi.discarded();
-      if (fi.isPersistent()) {
-        removePersistentFunction(fi);
+  public void unregisterFunction(String functionName) throws HiveException {
+    lock.lock();
+    try {
+      functionName = functionName.toLowerCase();
+      FunctionInfo fi = mFunctions.get(functionName);
+      if (fi != null) {
+        if (fi.isBuiltIn()) {
+          throw new HiveException(ErrorMsg.DROP_NATIVE_FUNCTION.getMsg(functionName));
+        }
+        mFunctions.remove(functionName);
+        fi.discarded();
+        if (fi.isPersistent()) {
+          removePersistentFunctionUnderLock(fi);
+        }
       }
+    } finally {
+      lock.unlock();
     }
   }
 
-  /** Should only be called from synchronized methods. */
-  private void removePersistentFunction(FunctionInfo fi) {
+  private void removePersistentFunctionUnderLock(FunctionInfo fi) {
     Class<?> functionClass = getPermanentUdfClass(fi);
     Integer refCount = persistent.get(functionClass);
     assert refCount != null;
@@ -478,10 +522,15 @@ public class Registry {
    * @param dbName database name
    * @throws HiveException
    */
-  public synchronized void unregisterFunctions(String dbName) throws HiveException {
-    Set<String> funcNames = getFunctionNames(dbName.toLowerCase() + "\\..*");
-    for (String funcName : funcNames) {
-      unregisterFunction(funcName);
+  public void unregisterFunctions(String dbName) throws HiveException {
+    lock.lock();
+    try {
+      Set<String> funcNames = getFunctionNames(dbName.toLowerCase() + "\\..*");
+      for (String funcName : funcNames) {
+        unregisterFunction(funcName);
+      }
+    } finally {
+      lock.unlock();
     }
   }
 
@@ -493,7 +542,7 @@ public class Registry {
     return null;
   }
 
-  private FunctionInfo getQualifiedFunctionInfo(String qualifiedName) throws SemanticException {
+  private FunctionInfo getQualifiedFunctionInfoUnderLock(String qualifiedName) throws SemanticException {
     FunctionInfo info = mFunctions.get(qualifiedName);
     if (info != null && info.isBlockedFunction()) {
       throw new SemanticException ("UDF " + qualifiedName + " is not allowed");
@@ -509,7 +558,24 @@ public class Registry {
     if (isNative && info != null && info.isPersistent()) {
       return registerToSessionRegistry(qualifiedName, info);
     }
-    return info;
+    if (info != null || !isNative) {
+      return info; // We have the UDF, or we are in the session registry (or both).
+    }
+    // If we are in the system registry and this feature is enabled, try to get it from metastore.
+    SessionState ss = SessionState.get();
+    HiveConf conf = (ss == null) ? null : ss.getConf();
+    if (conf == null || !HiveConf.getBoolVar(conf, ConfVars.HIVE_ALLOW_UDF_LOAD_ON_DEMAND)) {
+      return null;
+    }
+    // This is a little bit weird. We'll do the MS call outside of the lock. Our caller calls us
+    // under lock, so we'd preserve the lock state for them; their finally block will release the
+    // lock correctly. See the comment on the lock field - the locking needs to be reworked.
+    lock.unlock();
+    try {
+      return getFunctionInfoFromMetastoreNoLock(qualifiedName, conf);
+    } finally {
+      lock.lock();
+    }
   }
 
   // should be called after session registry is checked
@@ -545,40 +611,52 @@ public class Registry {
     return ret;
   }
 
-  private void checkFunctionClass(FunctionInfo cfi) throws ClassNotFoundException {
-    // This call will fail for non-generic UDFs using GenericUDFBridge
-    Class<?> udfClass = cfi.getFunctionClass();
-    // Even if we have a reference to the class (which will be the case for GenericUDFs),
-    // the classloader may not be able to resolve the class, which would mean reflection-based
-    // methods would fail such as for plan deserialization. Make sure this works too.
-    Class.forName(udfClass.getName(), true, Utilities.getSessionSpecifiedClassLoader());
-  }
-
-  public synchronized void clear() {
-    if (isNative) {
-      throw new IllegalStateException("System function registry cannot be cleared");
+  public void clear() {
+    lock.lock();
+    try {
+      if (isNative) {
+        throw new IllegalStateException("System function registry cannot be cleared");
+      }
+      mFunctions.clear();
+      builtIns.clear();
+      persistent.clear();
+    } finally {
+      lock.unlock();
     }
-    mFunctions.clear();
-    builtIns.clear();
-    persistent.clear();
   }
 
-  public synchronized void closeCUDFLoaders() {
+  public void closeCUDFLoaders() {
+    lock.lock();
     try {
-      for(ClassLoader loader: mSessionUDFLoaders) {
-        JavaUtils.closeClassLoader(loader);
+      try {
+        for(ClassLoader loader: mSessionUDFLoaders) {
+          JavaUtils.closeClassLoader(loader);
+        }
+      } catch (IOException ie) {
+          LOG.error("Error in close loader: " + ie);
       }
-    } catch (IOException ie) {
-        LOG.error("Error in close loader: " + ie);
+      mSessionUDFLoaders.clear();
+    } finally {
+      lock.unlock();
     }
-    mSessionUDFLoaders.clear();
   }
 
-  public synchronized void addToUDFLoaders(ClassLoader loader) {
-    mSessionUDFLoaders.add(loader);
+  public void addToUDFLoaders(ClassLoader loader) {
+    lock.lock();
+    try {
+      mSessionUDFLoaders.add(loader);
+    } finally {
+      lock.unlock();
+    }
   }
-  public synchronized void removeFromUDFLoaders(ClassLoader loader) {
-    mSessionUDFLoaders.remove(loader);
+
+  public void removeFromUDFLoaders(ClassLoader loader) {
+    lock.lock();
+    try {
+      mSessionUDFLoaders.remove(loader);
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -611,4 +689,29 @@ public class Registry {
     return blackList.contains(functionName) ||
         (!whiteList.isEmpty() && !whiteList.contains(functionName));
   }
+
+  /**
+   * This is called outside of the lock. Some of the methods that are called transitively by
+   * this (e.g. addFunction) will take the lock again and then release it, which is ok.
+   */
+  private FunctionInfo getFunctionInfoFromMetastoreNoLock(String functionName, HiveConf conf) {
+    try {
+      String[] parts = FunctionUtils.getQualifiedFunctionNameParts(functionName);
+      Function func = Hive.get(conf).getFunction(parts[0].toLowerCase(), parts[1]);
+      if (func == null) {
+        return null;
+      }
+      // Found UDF in metastore - now add it to the function registry.
+      FunctionInfo fi = registerPermanentFunction(functionName, func.getClassName(), true,
+          FunctionTask.toFunctionResource(func.getResourceUris()));
+      if (fi == null) {
+        LOG.error(func.getClassName() + " is not a valid UDF class and was not registered");
+        return null;
+      }
+      return fi;
+    } catch (Throwable e) {
+      LOG.info("Unable to look up " + functionName + " in metastore", e);
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/6460529a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 672df63..d211eb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -259,7 +259,7 @@ public class SessionState {
    */
   private final Set<String> preReloadableAuxJars = new HashSet<String>();
 
-  private final Registry registry = new Registry();
+  private final Registry registry;
 
   /**
    * CURRENT_TIMESTAMP value for query
@@ -352,6 +352,7 @@ public class SessionState {
   public SessionState(HiveConf conf, String userName) {
     this.sessionConf = conf;
     this.userName = userName;
+    this.registry = new Registry(false);
     if (LOG.isDebugEnabled()) {
       LOG.debug("SessionState user: " + userName);
     }


[04/11] hive git commit: HIVE-13469. LLAP: Support delayed scheduling for locality. (Siddharth Seth, reviewed by Prasanth Jayachandran, Sergey Shelukhin)

Posted by jd...@apache.org.
HIVE-13469. LLAP: Support delayed scheduling for locality. (Siddharth Seth, reviewed by Prasanth Jayachandran, Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/347a5a55
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/347a5a55
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/347a5a55

Branch: refs/heads/llap
Commit: 347a5a5580742a36a875bd6a5f2ac8acd74d3cbf
Parents: 076f365
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri Apr 29 15:14:27 2016 +0530
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri Apr 29 15:14:27 2016 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +-
 .../hive/llap/tezplugins/ContainerFactory.java  |   3 +-
 .../tezplugins/LlapTaskSchedulerService.java    | 377 ++++++++--
 .../llap/tezplugins/helpers/MonotonicClock.java |  24 +
 .../scheduler/LoggingFutureCallback.java        |  44 ++
 .../TestLlapTaskSchedulerService.java           | 734 ++++++++++++++++++-
 6 files changed, 1068 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 566e9b6..fd725cb 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2762,7 +2762,7 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.MILLISECONDS, -1l, true, Long.MAX_VALUE, true),
         "Amount of time to wait before allocating a request which contains location information," +
             " to a location other than the ones requested. Set to -1 for an infinite delay, 0" +
-            "for a no delay. Currently these are the only two supported values"
+            "for no delay."
     ),
     LLAP_DAEMON_TASK_PREEMPTION_METRICS_INTERVALS(
         "hive.llap.daemon.task.preemption.metrics.intervals", "30,60,300",

http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
index a314391..f1feec7 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/ContainerFactory.java
@@ -37,11 +37,10 @@ class ContainerFactory {
   }
 
   public Container createContainer(Resource capability, Priority priority, String hostname,
-      int port) {
+      int port, String nodeHttpAddress) {
     ContainerId containerId =
         ContainerId.newContainerId(customAppAttemptId, nextId.getAndIncrement());
     NodeId nodeId = NodeId.newInstance(hostname, port);
-    String nodeHttpAddress = "hostname:0"; // TODO: include UI ports
 
     Container container =
         Container.newInstance(containerId, nodeId, nodeHttpAddress, capability, priority, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index c3d3a1d..da1e17f 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -50,7 +50,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
@@ -62,6 +61,8 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstance;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
 import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
+import org.apache.hadoop.hive.llap.tezplugins.scheduler.LoggingFutureCallback;
 import org.apache.hadoop.hive.llap.tezplugins.metrics.LlapTaskSchedulerMetrics;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -77,7 +78,6 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
@@ -91,6 +91,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapTaskSchedulerService.class);
 
+  private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator();
+
   private final Configuration conf;
 
   // interface into the registry service
@@ -104,6 +106,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   // Tracks tasks which could not be allocated immediately.
   @VisibleForTesting
+  // Tasks are tracked in the order requests come in, at different priority levels.
+  // TODO HIVE-13538 For tasks at the same priority level, it may be worth attempting to schedule tasks with
+  // locality information before those without locality information
   final TreeMap<Priority, List<TaskInfo>> pendingTasks = new TreeMap<>(new Comparator<Priority>() {
     @Override
     public int compare(Priority o1, Priority o2) {
@@ -113,23 +118,30 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
   // Tracks running and queued tasks. Cleared after a task completes.
   private final ConcurrentMap<Object, TaskInfo> knownTasks = new ConcurrentHashMap<>();
+  // Tracks tasks which are running. Useful for selecting a task to preempt based on when it started.
   private final TreeMap<Integer, TreeSet<TaskInfo>> runningTasks = new TreeMap<>();
-  private static final TaskStartComparator TASK_INFO_COMPARATOR = new TaskStartComparator();
+
 
   // Queue for disabled nodes. Nodes make it out of this queue when their expiration timeout is hit.
   @VisibleForTesting
   final DelayQueue<NodeInfo> disabledNodesQueue = new DelayQueue<>();
+  @VisibleForTesting
+  final DelayQueue<TaskInfo> delayedTaskQueue = new DelayQueue<>();
 
-  private final boolean forceLocation;
 
   private final ContainerFactory containerFactory;
   private final Random random = new Random();
-  private final Clock clock;
+  @VisibleForTesting
+  final Clock clock;
 
   private final ListeningExecutorService nodeEnabledExecutor;
   private final NodeEnablerCallable nodeEnablerCallable =
       new NodeEnablerCallable();
 
+  private final ListeningExecutorService delayedTaskSchedulerExecutor;
+  @VisibleForTesting
+  final DelayedTaskSchedulerCallable delayedTaskSchedulerCallable;
+
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
   private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
@@ -147,6 +159,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private final Map<String, MutableInt> pendingPreemptionsPerHost = new HashMap<>();
 
   private final NodeBlacklistConf nodeBlacklistConf;
+  private final LocalityDelayConf localityDelayConf;
 
   // Per daemon
   private final int memoryPerInstance;
@@ -168,6 +181,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private final LlapRegistryService registry = new LlapRegistryService(false);
 
   private volatile ListenableFuture<Void> nodeEnablerFuture;
+  private volatile ListenableFuture<Void> delayedTaskSchedulerFuture;
   private volatile ListenableFuture<Void> schedulerFuture;
 
   @VisibleForTesting
@@ -181,7 +195,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   private final JvmPauseMonitor pauseMonitor;
 
   public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
-    this(taskSchedulerContext, new SystemClock(), true);
+    this(taskSchedulerContext, new MonotonicClock(), true);
   }
 
   @VisibleForTesting
@@ -189,6 +203,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       boolean initMetrics) {
     super(taskSchedulerContext);
     this.clock = clock;
+    this.delayedTaskSchedulerCallable = createDelayedTaskSchedulerCallable();
     try {
       this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
     } catch (IOException e) {
@@ -197,6 +212,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
     this.containerFactory = new ContainerFactory(taskSchedulerContext.getApplicationAttemptId(),
         taskSchedulerContext.getCustomClusterIdentifier());
+    // TODO HIVE-13483 Get all of these properties from the registry. This will need to take care of different instances
+    // publishing potentially different values when we support changing configurations dynamically.
+    // For now, this can simply be fetched from a single registry instance.
     this.memoryPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB);
     this.coresPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE);
     this.executorsPerInstance = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS);
@@ -212,11 +230,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
 
     long localityDelayMs = HiveConf
         .getTimeVar(conf, ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY, TimeUnit.MILLISECONDS);
-    if (localityDelayMs == -1) {
-      this.forceLocation = true;
-    } else {
-      this.forceLocation = false;
-    }
+
+    this.localityDelayConf = new LocalityDelayConf(localityDelayMs);
 
     this.timeoutMonitor = new SchedulerTimeoutMonitor();
     this.timeout = HiveConf.getTimeVar(conf,
@@ -240,6 +255,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerNodeEnabler").build());
     nodeEnabledExecutor = MoreExecutors.listeningDecorator(executorServiceRaw);
 
+    ExecutorService delayedTaskSchedulerExecutorRaw = Executors.newFixedThreadPool(1,
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapSchedulerDelayedTaskHandler")
+            .build());
+    delayedTaskSchedulerExecutor =
+        MoreExecutors.listeningDecorator(delayedTaskSchedulerExecutorRaw);
+
     ExecutorService schedulerExecutorServiceRaw = Executors.newSingleThreadExecutor(
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapScheduler").build());
     schedulerExecutor = MoreExecutors.listeningDecorator(schedulerExecutorServiceRaw);
@@ -266,7 +287,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         + ", vCoresPerInstance=" + coresPerInstance + ", executorsPerInstance="
         + executorsPerInstance + ", resourcePerInstanceInferred=" + resourcePerExecutor
         + ", nodeBlacklistConf=" + nodeBlacklistConf
-        + ", forceLocation=" + forceLocation);
+        + ", localityDelayMs=" + localityDelayMs);
   }
 
   @Override
@@ -279,29 +300,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     writeLock.lock();
     try {
       nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
-      Futures.addCallback(nodeEnablerFuture, new FutureCallback<Void>() {
-        @Override
-        public void onSuccess(Void result) {
-          LOG.info("NodeEnabledThread exited");
-        }
+      Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG));
+
+      delayedTaskSchedulerFuture =
+          delayedTaskSchedulerExecutor.submit(delayedTaskSchedulerCallable);
+      Futures.addCallback(delayedTaskSchedulerFuture,
+          new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG));
 
-        @Override
-        public void onFailure(Throwable t) {
-          LOG.warn("NodeEnabledThread exited with error", t);
-        }
-      });
       schedulerFuture = schedulerExecutor.submit(schedulerCallable);
-      Futures.addCallback(schedulerFuture, new FutureCallback<Void>() {
-        @Override
-        public void onSuccess(Void result) {
-          LOG.info("SchedulerThread exited");
-        }
+      Futures.addCallback(schedulerFuture, new LoggingFutureCallback("SchedulerThread", LOG));
 
-        @Override
-        public void onFailure(Throwable t) {
-          LOG.warn("SchedulerThread exited with error", t);
-        }
-      });
       registry.start();
       registry.registerStateChangeListener(new NodeStateChangeListener());
       activeInstances = registry.getInstances();
@@ -399,6 +407,12 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         }
         timeoutExecutor.shutdownNow();
 
+        delayedTaskSchedulerCallable.shutdown();
+        if (delayedTaskSchedulerFuture != null) {
+          delayedTaskSchedulerFuture.cancel(true);
+        }
+        delayedTaskSchedulerExecutor.shutdownNow();
+
         schedulerCallable.shutdown();
         if (schedulerFuture != null) {
           schedulerFuture.cancel(true);
@@ -502,6 +516,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   @Override
   public void blacklistNode(NodeId nodeId) {
     LOG.info("BlacklistNode not supported");
+    // TODO HIVE-13484 What happens when we try scheduling a task on a node that Tez at this point thinks is blacklisted.
   }
 
   @Override
@@ -513,7 +528,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
   public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
       Priority priority, Object containerSignature, Object clientCookie) {
     TaskInfo taskInfo =
-        new TaskInfo(task, clientCookie, priority, capability, hosts, racks, clock.getTime());
+        new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, hosts, racks, clock.getTime());
     writeLock.lock();
     try {
       dagStats.registerTaskRequest(hosts, racks);
@@ -530,7 +545,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     // Container affinity can be implemented as Host affinity for LLAP. Not required until
     // 1:1 edges are used in Hive.
     TaskInfo taskInfo =
-        new TaskInfo(task, clientCookie, priority, capability, null, null, clock.getTime());
+        new TaskInfo(localityDelayConf, clock, task, clientCookie, priority, capability, null, null, clock.getTime());
     writeLock.lock();
     try {
       dagStats.registerTaskRequest(null, null);
@@ -558,7 +573,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         return false;
       }
       if (taskInfo.containerId == null) {
-        if (taskInfo.assigned) {
+        if (taskInfo.getState() == TaskInfo.State.ASSIGNED) {
           LOG.error("Task: "
               + task
               + " assigned, but could not find the corresponding containerId."
@@ -577,7 +592,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       assert nodeInfo != null;
 
       // Re-enable the node if preempted
-      if (taskInfo.preempted) {
+      if (taskInfo.getState() == TaskInfo.State.PREEMPTED) {
         LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason);
         unregisterPendingPreemption(taskInfo.assignedInstance.getHost());
         nodeInfo.registerUnsuccessfulTaskEnd(true);
@@ -607,7 +622,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           // In case of success, trigger a scheduling run for pending tasks.
           trySchedulingPendingTasks();
 
-        } else if (!taskSucceeded) {
+        } else { // Task Failed
           nodeInfo.registerUnsuccessfulTaskEnd(false);
           if (endReason != null && EnumSet
               .of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR)
@@ -665,17 +680,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     return true;
   }
 
-  private ExecutorService createAppCallbackExecutorService() {
-    return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-        .setNameFormat("TaskSchedulerAppCaller #%d").setDaemon(true).build());
-  }
-
   /**
    * @param request the list of preferred hosts. null implies any host
    * @return
    */
   private SelectHostResult selectHost(TaskInfo request) {
     String[] requestedHosts = request.requestedHosts;
+    long schedulerAttemptTime = clock.getTime();
     readLock.lock(); // Read-lock. Not updating any stats at the moment.
     try {
       // If there's no memory available, fail
@@ -683,32 +694,61 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         return SELECT_HOST_RESULT_INADEQUATE_TOTAL_CAPACITY;
       }
 
+      boolean shouldDelayForLocality = request.shouldDelayForLocality(schedulerAttemptTime);
       if (requestedHosts != null && requestedHosts.length > 0) {
         int prefHostCount = -1;
-        boolean requestedHostExists = false;
+        boolean requestedHostsWillBecomeAvailable = false;
         for (String host : requestedHosts) {
           prefHostCount++;
           // Pick the first host always. Weak attempt at cache affinity.
           Set<ServiceInstance> instances = activeInstances.getByHost(host);
           if (!instances.isEmpty()) {
-            requestedHostExists = true;
             for (ServiceInstance inst : instances) {
               NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
-              if (nodeInfo != null && nodeInfo.canAcceptTask()) {
-                LOG.info("Assigning " + inst + " when looking for " + host + "." +
-                    " FirstRequestedHost=" + (prefHostCount == 0) +
-                    (requestedHosts.length > 1 ? "#prefLocations=" + requestedHosts.length : ""));
-                return new SelectHostResult(inst, nodeInfo);
+              if (nodeInfo != null) {
+                if  (nodeInfo.canAcceptTask()) {
+                  // Successfully scheduled.
+                  LOG.info(
+                      "Assigning " + nodeToString(inst, nodeInfo) + " when looking for " + host +
+                          ". local=true" + " FirstRequestedHost=" + (prefHostCount == 0) +
+                          (requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length :
+                              ""));
+                  return new SelectHostResult(inst, nodeInfo);
+                } else {
+                  // The node cannot accept a task at the moment.
+                  if (shouldDelayForLocality) {
+                    // Perform some checks on whether the node will become available or not.
+                    if (request.shouldForceLocality()) {
+                      requestedHostsWillBecomeAvailable = true;
+                    } else {
+                      if (nodeInfo.getEnableTime() > request.getLocalityDelayTimeout() &&
+                          nodeInfo.isDisabled() && nodeInfo.hadCommFailure()) {
+                        // This node will likely be activated after the task timeout expires.
+                      } else {
+                        // Worth waiting for the timeout.
+                        requestedHostsWillBecomeAvailable = true;
+                      }
+                    }
+                  }
+                }
+              } else {
+                LOG.warn(
+                    "Null NodeInfo when attempting to get host with worker identity {}, and host {}",
+                    inst.getWorkerIdentity(), host);
+                // Leave requestedHostWillBecomeAvailable as is. If some other host is found - delay,
+                // else ends up allocating to a random host immediately.
               }
             }
           }
         }
         // Check if forcing the location is required.
-        if (forceLocation) {
-          if (requestedHostExists) {
+        if (shouldDelayForLocality) {
+          if (requestedHostsWillBecomeAvailable) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("Skipping non-local location allocation for [" + request.task +
-                  "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]");
+                  "] when trying to allocate on [" + Arrays.toString(requestedHosts) + "]" +
+                  ". ScheduleAttemptTime=" + schedulerAttemptTime + ", taskDelayTimeout=" +
+                  request.getLocalityDelayTimeout());
             }
             return SELECT_HOST_RESULT_DELAYED_LOCALITY;
           } else {
@@ -729,10 +769,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         for (int i = 0; i < all.length; i++) {
           Entry<String, NodeInfo> inst = all[(i + n) % all.length];
           if (inst.getValue().canAcceptTask()) {
-            LOG.info("Assigning " + inst + " when looking for any host, from #hosts=" + all.length +
-                ", requestedHosts=" +
-                ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
-                    Arrays.toString(requestedHosts)));
+            LOG.info(
+                "Assigning " + nodeToString(inst.getValue().getServiceInstance(), inst.getValue()) +
+                    " when looking for any host, from #hosts=" + all.length + ", requestedHosts=" +
+                    ((requestedHosts == null || requestedHosts.length == 0) ? "null" :
+                        Arrays.toString(requestedHosts)));
             return new SelectHostResult(inst.getValue().getServiceInstance(), inst.getValue());
           }
         }
@@ -820,6 +861,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         tasksAtPriority = new LinkedList<>();
         pendingTasks.put(taskInfo.priority, tasksAtPriority);
       }
+      // Delayed tasks will not kick in right now. That will happen in the scheduling loop.
       tasksAtPriority.add(taskInfo);
       knownTasks.putIfAbsent(taskInfo.task, taskInfo);
       if (metrics != null) {
@@ -870,7 +912,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     try {
       TaskInfo taskInfo = knownTasks.remove(task);
       if (taskInfo != null) {
-        if (taskInfo.assigned) {
+        if (taskInfo.getState() == TaskInfo.State.ASSIGNED) {
           // Remove from the running list.
           int priority = taskInfo.priority.getPriority();
           Set<TaskInfo> tasksAtPriority = runningTasks.get(priority);
@@ -925,6 +967,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           }
           taskInfo.triedAssigningTask();
           ScheduleResult scheduleResult = scheduleTask(taskInfo);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("ScheduleResult for Task: {} = {}", taskInfo, scheduleResult);
+          }
           if (scheduleResult == ScheduleResult.SCHEDULED) {
             taskIter.remove();
           } else {
@@ -938,6 +983,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             // Preempt only if there's no pending preemptions to avoid preempting twice for a task.
             String[] potentialHosts;
             if (scheduleResult == ScheduleResult.DELAYED_LOCALITY) {
+
+              // Add the task to the delayed task queue if it does not already exist.
+              maybeAddToDelayedTaskQueue(taskInfo);
+
+              // Try preempting a lower priority task in any case.
               // preempt only on specific hosts, if no preemptions already exist on those.
               potentialHosts = taskInfo.requestedHosts;
               //Protect against a bad location being requested.
@@ -1008,7 +1058,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       Container container =
           containerFactory.createContainer(resourcePerExecutor, taskInfo.priority,
               nsPair.getServiceInstance().getHost(),
-              nsPair.getServiceInstance().getRpcPort());
+              nsPair.getServiceInstance().getRpcPort(),
+              nsPair.getServiceInstance().getServicesAddress());
       writeLock.lock(); // While updating local structures
       try {
         LOG.info("Assigned task {} to container {}", taskInfo, container.getId());
@@ -1125,9 +1176,81 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
+  private void maybeAddToDelayedTaskQueue(TaskInfo taskInfo) {
+    // There's no point adding a task with forceLocality set - since that will never exit the queue.
+    // Add other tasks if they are not already in the queue.
+    if (!taskInfo.shouldForceLocality() && !taskInfo.isInDelayedQueue()) {
+      taskInfo.setInDelayedQueue(true);
+      delayedTaskQueue.add(taskInfo);
+    }
+  }
+
+  private String nodeToString(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
+    return serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", workerIdentity=" +
+        serviceInstance.getWorkerIdentity() + ", webAddress=" +
+        serviceInstance.getServicesAddress() + ", currentlyScheduledTasksOnNode=" + nodeInfo.numScheduledTasks;
+  }
+
+
+
+  // ------ Inner classes defined after this point ------
+
+  @VisibleForTesting
+  class DelayedTaskSchedulerCallable implements Callable<Void> {
+
+    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+    @Override
+    public Void call() {
+      while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
+        try {
+          TaskInfo taskInfo = getNextTask();
+          taskInfo.setInDelayedQueue(false);
+          // Tasks can exist in the delayed queue even after they have been scheduled.
+          // Trigger scheduling only if the task is still in PENDING state.
+          processEvictedTask(taskInfo);
+
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            LOG.info("DelayedTaskScheduler thread interrupted after shutdown");
+            break;
+          } else {
+            LOG.warn("DelayedTaskScheduler thread interrupted before being shutdown");
+            throw new RuntimeException(
+                "DelayedTaskScheduler thread interrupted without being shutdown", e);
+          }
+        }
+      }
+      return null;
+    }
+
+    public void shutdown() {
+      isShutdown.set(true);
+    }
+
+    public TaskInfo getNextTask() throws InterruptedException {
+      return delayedTaskQueue.take();
+    }
+
+    public void processEvictedTask(TaskInfo taskInfo) {
+      if (shouldScheduleTask(taskInfo)) {
+        trySchedulingPendingTasks();
+      }
+    }
+
+    public boolean shouldScheduleTask(TaskInfo taskInfo) {
+      return taskInfo.getState() == TaskInfo.State.PENDING;
+    }
+  }
+
+  @VisibleForTesting
+  DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() {
+    return new DelayedTaskSchedulerCallable();
+  }
+
   private class NodeEnablerCallable implements Callable<Void> {
 
-    private AtomicBoolean isShutdown = new AtomicBoolean(false);
+    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
     private static final long REFRESH_INTERVAL = 10000l;
     long nextPollInterval = REFRESH_INTERVAL;
     long lastRefreshTime;
@@ -1135,13 +1258,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     @Override
     public Void call() {
 
-      lastRefreshTime = System.currentTimeMillis();
+      lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         try {
           while (true) {
             NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, TimeUnit.MILLISECONDS);
             if (nodeInfo != null) {
-              long currentTime = System.currentTimeMillis();
+              long currentTime =  LlapTaskSchedulerService.this.clock.getTime();
               // A node became available. Enable the node and try scheduling.
               reenableDisabledNode(nodeInfo);
               trySchedulingPendingTasks();
@@ -1152,7 +1275,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
             if (nextPollInterval < 0 || nodeInfo == null) {
               // timeout expired. Reset the poll interval and refresh nodes.
               nextPollInterval = REFRESH_INTERVAL;
-              lastRefreshTime = System.currentTimeMillis();
+              lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
               // TODO Get rid of this polling once we have notificaitons from the registry sub-system
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Refreshing instances based on poll interval");
@@ -1232,6 +1355,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         // will be handled in the next run.
         // A new request may come in right after this is set to false, but before the actual scheduling.
         // This will be handled in this run, but will cause an immediate run after, which is harmless.
+        // This is mainly to handle a trySchedue request while in the middle of a run - since the event
+        // which triggered it may not be processed for all tasks in the run.
         pendingScheduleInvocations.set(false);
         // Schedule outside of the scheduleLock - which should only be used to wait on the condition.
         schedulePendingTasks();
@@ -1245,6 +1370,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
+  // ------ Additional static classes defined after this point ------
+
   @VisibleForTesting
   static class NodeInfo implements Delayed {
     private final NodeBlacklistConf blacklistConf;
@@ -1257,6 +1384,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     float cumulativeBackoffFactor = 1.0f;
 
     // Indicates whether a node had a recent communication failure.
+    // This is primarily for tracking and logging purposes for the moment.
+    // TODO At some point, treat task rejection and communication failures differently.
     private boolean hadCommFailure = false;
 
     // Indicates whether a node is disabled - for whatever reason - commFailure, busy, etc.
@@ -1375,6 +1504,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       }
     }
 
+    /**
+     * @return the time at which this node will be re-enabled
+     */
+    public long getEnableTime() {
+      return expireTimeMillis;
+    }
+
     public boolean isDisabled() {
       return disabled;
     }
@@ -1382,13 +1518,20 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     public boolean hadCommFailure() {
       return hadCommFailure;
     }
+
     /* Returning true does not guarantee that the task will run, considering other queries
     may be running in the system. Also depends upon the capacity usage configuration
      */
     public boolean canAcceptTask() {
       boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive()
           &&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
-      LOG.info("canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}", result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled, serviceInstance.isAlive());
+      if (LOG.isInfoEnabled()) {
+        LOG.info("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
+                serviceInstance.getWorkerIdentity() + "]: " +
+                "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}",
+            result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled,
+            serviceInstance.isAlive());
+      }
       return result;
     }
 
@@ -1512,11 +1655,23 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     }
   }
 
-  private static class TaskInfo {
+
+  // TODO There needs to be a mechanism to figure out different attempts for the same task. Delays
+  // could potentially be changed based on this.
+
+  @VisibleForTesting
+  static class TaskInfo implements Delayed {
+
+    enum State {
+      PENDING, ASSIGNED, PREEMPTED
+    }
+
     // IDs used to ensure two TaskInfos are different without using the underlying task instance.
     // Required for insertion into a TreeMap
     static final AtomicLong ID_GEN = new AtomicLong(0);
     final long uniqueId;
+    final LocalityDelayConf localityDelayConf;
+    final Clock clock;
     final Object task;
     final Object clientCookie;
     final Priority priority;
@@ -1524,19 +1679,22 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     final String[] requestedHosts;
     final String[] requestedRacks;
     final long requestTime;
+    final long localityDelayTimeout;
     long startTime;
     long preemptTime;
     ContainerId containerId;
     ServiceInstance assignedInstance;
-    private boolean assigned = false;
-    private boolean preempted = false;
+    private State state = State.PENDING;
+    boolean inDelayedQueue = false;
 
     private int numAssignAttempts = 0;
 
     // TaskInfo instances for two different tasks will not be the same. Only a single instance should
     // ever be created for a taskAttempt
-    public TaskInfo(Object task, Object clientCookie, Priority priority, Resource capability,
+    public TaskInfo(LocalityDelayConf localityDelayConf, Clock clock, Object task, Object clientCookie, Priority priority, Resource capability,
         String[] hosts, String[] racks, long requestTime) {
+      this.localityDelayConf = localityDelayConf;
+      this.clock = clock;
       this.task = task;
       this.clientCookie = clientCookie;
       this.priority = priority;
@@ -1544,30 +1702,61 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       this.requestedHosts = hosts;
       this.requestedRacks = racks;
       this.requestTime = requestTime;
+      if (localityDelayConf.getNodeLocalityDelay() == -1) {
+        localityDelayTimeout = Long.MAX_VALUE;
+      } else if (localityDelayConf.getNodeLocalityDelay() == 0) {
+        localityDelayTimeout = 0L;
+      } else {
+        localityDelayTimeout = requestTime + localityDelayConf.getNodeLocalityDelay();
+      }
       this.uniqueId = ID_GEN.getAndIncrement();
     }
 
-    void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) {
+    synchronized void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) {
       this.assignedInstance = instance;
-        this.containerId = containerId;
+      this.containerId = containerId;
       this.startTime = startTime;
-      assigned = true;
+      this.state = State.ASSIGNED;
     }
 
-    void setPreemptedInfo(long preemptTime) {
-      this.preempted = true;
-      this.assigned = false;
+    synchronized void setPreemptedInfo(long preemptTime) {
+      this.state = State.PREEMPTED;
       this.preemptTime = preemptTime;
     }
 
-    void triedAssigningTask() {
+    synchronized void setInDelayedQueue(boolean val) {
+      this.inDelayedQueue = val;
+    }
+
+    synchronized void triedAssigningTask() {
       numAssignAttempts++;
     }
 
-    int getNumPreviousAssignAttempts() {
+    synchronized int getNumPreviousAssignAttempts() {
       return numAssignAttempts;
     }
 
+    synchronized State getState() {
+      return state;
+    }
+
+    synchronized boolean isInDelayedQueue() {
+      return inDelayedQueue;
+    }
+
+    boolean shouldDelayForLocality(long schedulerAttemptTime) {
+      // getDelay <=0 means the task will be evicted from the queue.
+      return localityDelayTimeout > schedulerAttemptTime;
+    }
+
+    boolean shouldForceLocality() {
+      return localityDelayTimeout == Long.MAX_VALUE;
+    }
+
+    long getLocalityDelayTimeout() {
+      return localityDelayTimeout;
+    }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) {
@@ -1602,8 +1791,26 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           ", containerId=" + containerId +
           ", assignedInstance=" + assignedInstance +
           ", uniqueId=" + uniqueId +
+          ", localityDelayTimeout=" + localityDelayTimeout +
           '}';
     }
+
+    @Override
+    public long getDelay(TimeUnit unit) {
+      return unit.convert(localityDelayTimeout - clock.getTime(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public int compareTo(Delayed o) {
+      TaskInfo other = (TaskInfo) o;
+      if (other.localityDelayTimeout > this.localityDelayTimeout) {
+        return -1;
+      } else if (other.localityDelayTimeout < this.localityDelayTimeout) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
   }
 
   // Newer tasks first.
@@ -1689,4 +1896,24 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           '}';
     }
   }
+
+  @VisibleForTesting
+  static final class LocalityDelayConf {
+    private final long nodeLocalityDelay;
+
+    public LocalityDelayConf(long nodeLocalityDelay) {
+      this.nodeLocalityDelay = nodeLocalityDelay;
+    }
+
+    public long getNodeLocalityDelay() {
+      return nodeLocalityDelay;
+    }
+
+    @Override
+    public String toString() {
+      return "LocalityDelayConf{" +
+          "nodeLocalityDelay=" + nodeLocalityDelay +
+          '}';
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
new file mode 100644
index 0000000..aaa6f91
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/helpers/MonotonicClock.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.tezplugins.helpers;
+
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.util.Clock;
+
+public class MonotonicClock implements Clock {
+  @Override
+  public long getTime() {
+    return Time.monotonicNow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
new file mode 100644
index 0000000..ea700da
--- /dev/null
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/scheduler/LoggingFutureCallback.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tezplugins.scheduler;
+
+import java.util.concurrent.CancellationException;
+
+import com.google.common.util.concurrent.FutureCallback;
+import org.slf4j.Logger;
+
+public final class LoggingFutureCallback implements FutureCallback<Void> {
+  private final String componentName;
+  private final Logger LOG;
+
+  public LoggingFutureCallback(String componentName, Logger log) {
+    this.componentName = componentName;
+    LOG = log;
+  }
+
+  @Override
+  public void onSuccess(Void result) {
+    LOG.info("{} exited", componentName);
+  }
+
+  @Override
+  public void onFailure(Throwable t) {
+    if (t instanceof CancellationException) {
+      LOG.info("{} was cancelled", componentName, t.getMessage());
+    } else {
+      LOG.warn("{} exited with error", componentName, t);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/347a5a55/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index b2cd55e..e4fe79c 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -15,7 +15,9 @@
 package org.apache.hadoop.hive.llap.tezplugins;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -26,6 +28,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
+import java.util.concurrent.DelayQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +40,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.registry.impl.LlapFixedRegistryImpl;
 import org.apache.hadoop.hive.llap.testhelpers.ControlledClock;
+import org.apache.hadoop.hive.llap.tezplugins.helpers.MonotonicClock;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -44,7 +49,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
@@ -62,7 +66,7 @@ public class TestLlapTaskSchedulerService {
   private static final String HOST2 = "host2";
   private static final String HOST3 = "host3";
 
-  @Test (timeout = 5000)
+  @Test(timeout = 10000)
   public void testSimpleLocalAllocation() throws IOException, InterruptedException {
 
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
@@ -77,18 +81,17 @@ public class TestLlapTaskSchedulerService {
       tsWrapper.controlScheduler(true);
       tsWrapper.allocateTask(task1, hosts1, priority1, clientCookie1);
 
-      tsWrapper.signalSchedulerRun();
-      tsWrapper.awaitSchedulerRun();
+      tsWrapper.awaitLocalTaskAllocations(1);
 
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class));
-      // TODO Verify this is on host1.
       assertEquals(1, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
     } finally {
       tsWrapper.shutdown();
     }
   }
 
-  @Test (timeout = 5000)
+  @Test(timeout = 10000)
   public void testSimpleNoLocalityAllocation() throws IOException, InterruptedException {
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper();
 
@@ -99,8 +102,7 @@ public class TestLlapTaskSchedulerService {
       Object clientCookie1 = new Object();
       tsWrapper.controlScheduler(true);
       tsWrapper.allocateTask(task1, null, priority1, clientCookie1);
-      tsWrapper.signalSchedulerRun();
-      tsWrapper.awaitSchedulerRun();
+      tsWrapper.awaitTotalTaskAllocations(1);
       verify(tsWrapper.mockAppCallback).taskAllocated(eq(task1), eq(clientCookie1), any(Container.class));
       assertEquals(1, tsWrapper.ts.dagStats.numAllocationsNoLocalityRequest);
     } finally {
@@ -109,7 +111,7 @@ public class TestLlapTaskSchedulerService {
   }
 
 
-  @Test(timeout=5000)
+  @Test(timeout = 10000)
   public void testPreemption() throws InterruptedException, IOException {
 
     Priority priority1 = Priority.newInstance(1);
@@ -174,7 +176,7 @@ public class TestLlapTaskSchedulerService {
 
   }
 
-  @Test(timeout=5000)
+  @Test(timeout = 10000)
   public void testNodeDisabled() throws IOException, InterruptedException {
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(10000l);
     try {
@@ -233,7 +235,7 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
-  @Test(timeout=5000)
+  @Test(timeout = 10000)
   public void testNodeReEnabled() throws InterruptedException, IOException {
     // Based on actual timing.
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(1000l);
@@ -307,7 +309,7 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
-  @Test (timeout = 5000)
+  @Test(timeout = 10000)
   public void testForceLocalityTest1() throws IOException, InterruptedException {
     // 2 hosts. 2 per host. 5 requests at the same priority.
     // First 3 on host1, Next at host2, Last with no host.
@@ -316,7 +318,7 @@ public class TestLlapTaskSchedulerService {
 
   }
 
-  @Test (timeout = 5000)
+  @Test(timeout = 10000)
   public void testNoForceLocalityCounterTest1() throws IOException, InterruptedException {
     // 2 hosts. 2 per host. 5 requests at the same priority.
     // First 3 on host1, Next at host2, Last with no host.
@@ -411,7 +413,7 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
-  @Test(timeout = 5000)
+  @Test(timeout = 10000)
   public void testForcedLocalityUnknownHost() throws IOException, InterruptedException {
     Priority priority1 = Priority.newInstance(1);
 
@@ -454,15 +456,13 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
-
-  @Test(timeout = 5000)
+  @Test(timeout = 10000)
   public void testForcedLocalityPreemption() throws IOException, InterruptedException {
     Priority priority1 = Priority.newInstance(1);
     Priority priority2 = Priority.newInstance(2);
     String [] hosts = new String[] {HOST1, HOST2};
 
     String [] hostsH1 = new String[] {HOST1};
-    String [] hostsH2 = new String[] {HOST2};
 
     TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
 
@@ -485,13 +485,7 @@ public class TestLlapTaskSchedulerService {
       tsWrapper.allocateTask(task2, hostsH1, priority2, clientCookie2);
       // This request at a lower priority should not affect anything.
       tsWrapper.allocateTask(task3, hostsH1, priority2, clientCookie3);
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numLocalAllocations == 2) {
-          break;
-        }
-      }
+      tsWrapper.awaitLocalTaskAllocations(2);
 
       verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
       ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
@@ -517,13 +511,8 @@ public class TestLlapTaskSchedulerService {
 
       tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.INTERNAL_PREEMPTION);
 
-      while (true) {
-        tsWrapper.signalSchedulerRun();
-        tsWrapper.awaitSchedulerRun();
-        if (tsWrapper.ts.dagStats.numTotalAllocations == 3) {
-          break;
-        }
-      }
+      tsWrapper.awaitLocalTaskAllocations(3);
+
       verify(tsWrapper.mockAppCallback, times(1)).taskAllocated(eq(task4),
           eq(clientCookie4), any(Container.class));
 
@@ -532,11 +521,471 @@ public class TestLlapTaskSchedulerService {
     }
   }
 
+  @Test(timeout = 10000)
+  public void testForcedLocalityNotInDelayedQueue() throws IOException, InterruptedException {
+    String[] hosts = new String[]{HOST1, HOST2};
+
+    String[] hostsH1 = new String[]{HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, -1l);
+    testNotInQueue(tsWrapper, hostsH1);
+  }
+
+  @Test(timeout = 10000)
+  public void testNoLocalityNotInDelayedQueue() throws IOException, InterruptedException {
+    String[] hosts = new String[]{HOST1};
+
+    String[] hostsH1 = new String[]{HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 0l);
+    testNotInQueue(tsWrapper, hostsH1);
+  }
+
+  private void testNotInQueue(TestTaskSchedulerServiceWrapper tsWrapper, String[] hosts) throws
+      InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    try {
+      tsWrapper.controlScheduler(true);
+      tsWrapper.allocateTask(hosts, priority1);
+      tsWrapper.allocateTask(hosts, priority1);
+      tsWrapper.allocateTask(hosts, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      assertEquals(0, tsWrapper.ts.delayedTaskQueue.size());
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testDelayedLocalityFallbackToNonLocal() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+    LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+        delayedTaskSchedulerCallableControlled =
+        (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+    ControlledClock clock = tsWrapper.getClock();
+    clock.setTime(clock.getTime());
+
+    // Fill up host1 with tasks. Leave host2 empty.
+    try {
+      tsWrapper.controlScheduler(true);
+      Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      reset(tsWrapper.mockAppCallback);
+
+      // No capacity left on node1. The next task should be allocated to node2 after it times out.
+      clock.setTime(clock.getTime() + 10000l); // Past the timeout.
+
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+          delayedTaskSchedulerCallableControlled.lastState);
+
+      delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+      delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+
+      // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK,
+          delayedTaskSchedulerCallableControlled.lastState);
+      assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered &&
+          delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult);
+
+      tsWrapper.awaitChangeInTotalAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(1))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+      assertEquals(1, argumentCaptor.getAllValues().size());
+      assertEquals(task3, argumentCaptor.getAllValues().get(0));
+      Container assignedContainer = containerCaptor.getValue();
+      assertEquals(HOST2, assignedContainer.getNodeId().getHost());
+
+
+      assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+      assertEquals(2, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+      assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST2).get());
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testDelayedLocalityDelayedAllocation() throws InterruptedException, IOException {
+    Priority priority1 = Priority.newInstance(1);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+    LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+        delayedTaskSchedulerCallableControlled =
+        (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+    ControlledClock clock = tsWrapper.getClock();
+    clock.setTime(clock.getTime());
+
+    // Fill up host1 with tasks. Leave host2 empty.
+    try {
+      tsWrapper.controlScheduler(true);
+      Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Move the clock forward 2000ms, and check the delayed queue
+      clock.setTime(clock.getTime() + 2000l); // Past the timeout.
+
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+          delayedTaskSchedulerCallableControlled.lastState);
+
+      delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+      delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+
+      // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED,
+          delayedTaskSchedulerCallableControlled.lastState);
+      assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered);
+
+      tsWrapper.deallocateTask(task1, true, null);
+
+      // Node1 now has free capacity. task1 should be allocated to it.
+      tsWrapper.awaitChangeInTotalAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(1))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+      assertEquals(1, argumentCaptor.getAllValues().size());
+      assertEquals(task3, argumentCaptor.getAllValues().get(0));
+      Container assignedContainer = containerCaptor.getValue();
+      assertEquals(HOST1, assignedContainer.getNodeId().getHost());
+
+
+      assertEquals(3, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(0, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+      assertEquals(3, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testDelayedQueeTaskSelectionAfterScheduled() throws IOException,
+      InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(2000, hosts, 1, 1, 10000l, true);
+    LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+        delayedTaskSchedulerCallableControlled =
+        (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+    ControlledClock clock = tsWrapper.getClock();
+    clock.setTime(clock.getTime());
+
+    // Fill up host1 with tasks. Leave host2 empty.
+    try {
+      tsWrapper.controlScheduler(true);
+      Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      // Simulate a 2s delay before finishing the task.
+      clock.setTime(clock.getTime() + 2000);
+
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_NOT_RUN,
+          delayedTaskSchedulerCallableControlled.lastState);
+
+      delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+      delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_TIMEOUT_NOT_EXPIRED,
+          delayedTaskSchedulerCallableControlled.lastState);
+      assertFalse(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered);
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Now finish task1, which will make capacity for task3 to run. Nothing is coming out of the delayed queue yet.
+      tsWrapper.deallocateTask(task1, true, null);
+      tsWrapper.awaitLocalTaskAllocations(3);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(1))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+      assertEquals(1, argumentCaptor.getAllValues().size());
+      assertEquals(task3, argumentCaptor.getAllValues().get(0));
+      Container assignedContainer = containerCaptor.getValue();
+      assertEquals(HOST1, assignedContainer.getNodeId().getHost());
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Move the clock forward and trigger a run.
+      clock.setTime(clock.getTime() + 8000); // Set to start + 10000 which is the timeout
+      delayedTaskSchedulerCallableControlled.triggerGetNextTask();
+      delayedTaskSchedulerCallableControlled.awaitGetNextTaskProcessing();
+      assertEquals(
+          LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled.STATE_RETURNED_TASK,
+          delayedTaskSchedulerCallableControlled.lastState);
+      // Verify that an attempt was made to schedule the task, but the decision was to skip scheduling
+      assertTrue(delayedTaskSchedulerCallableControlled.shouldScheduleTaskTriggered &&
+          !delayedTaskSchedulerCallableControlled.lastShouldScheduleTaskResult);
+
+      // Ensure there's no more invocations.
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      verify(tsWrapper.mockAppCallback, never()).taskAllocated(any(Object.class), any(Object.class), any(Container.class));
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test(timeout = 10000)
+  public void testTaskInfoDelay() {
+
+    LlapTaskSchedulerService.LocalityDelayConf localityDelayConf1 =
+        new LlapTaskSchedulerService.LocalityDelayConf(3000);
+
+    ControlledClock clock = new ControlledClock(new MonotonicClock());
+    clock.setTime(clock.getTime());
+
+
+    // With a timeout of 3000.
+    LlapTaskSchedulerService.TaskInfo taskInfo =
+        new LlapTaskSchedulerService.TaskInfo(localityDelayConf1, clock, new Object(), new Object(),
+            mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+    assertFalse(taskInfo.shouldForceLocality());
+
+    assertEquals(3000, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+    assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+    clock.setTime(clock.getTime() + 500);
+    assertEquals(2500, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+    assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+    clock.setTime(clock.getTime() + 2500);
+    assertEquals(0, taskInfo.getDelay(TimeUnit.MILLISECONDS));
+    assertFalse(taskInfo.shouldDelayForLocality(clock.getTime()));
+
+
+    // No locality delay
+    LlapTaskSchedulerService.LocalityDelayConf localityDelayConf2 =
+        new LlapTaskSchedulerService.LocalityDelayConf(0);
+    taskInfo =
+        new LlapTaskSchedulerService.TaskInfo(localityDelayConf2, clock, new Object(), new Object(),
+            mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+    assertFalse(taskInfo.shouldDelayForLocality(clock.getTime()));
+    assertFalse(taskInfo.shouldForceLocality());
+    assertTrue(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0);
+
+    // Force locality
+    LlapTaskSchedulerService.LocalityDelayConf localityDelayConf3 =
+        new LlapTaskSchedulerService.LocalityDelayConf(-1);
+    taskInfo =
+        new LlapTaskSchedulerService.TaskInfo(localityDelayConf3, clock, new Object(), new Object(),
+            mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+    assertTrue(taskInfo.shouldDelayForLocality(clock.getTime()));
+    assertTrue(taskInfo.shouldForceLocality());
+    assertFalse(taskInfo.getDelay(TimeUnit.MILLISECONDS) < 0);
+  }
+
+  @Test(timeout = 10000)
+  public void testLocalityDelayTaskOrdering() throws InterruptedException, IOException {
+
+    LlapTaskSchedulerService.LocalityDelayConf localityDelayConf =
+        new LlapTaskSchedulerService.LocalityDelayConf(3000);
+
+    ControlledClock clock = new ControlledClock(new MonotonicClock());
+    clock.setTime(clock.getTime());
+
+    DelayQueue<LlapTaskSchedulerService.TaskInfo> delayedQueue = new DelayQueue<>();
+
+    LlapTaskSchedulerService.TaskInfo taskInfo1 =
+        new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(),
+            mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+    clock.setTime(clock.getTime() + 1000);
+    LlapTaskSchedulerService.TaskInfo taskInfo2 =
+        new LlapTaskSchedulerService.TaskInfo(localityDelayConf, clock, new Object(), new Object(),
+            mock(Priority.class), mock(Resource.class), null, null, clock.getTime());
+
+    delayedQueue.add(taskInfo1);
+    delayedQueue.add(taskInfo2);
+
+    assertEquals(taskInfo1, delayedQueue.peek());
+  }
+
+  @Test (timeout = 15000)
+  public void testDelayedLocalityNodeCommErrorImmediateAllocation() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    // Node disable timeout higher than locality delay.
+    TestTaskSchedulerServiceWrapper tsWrapper = new TestTaskSchedulerServiceWrapper(20000, hosts, 1, 1, 10000l);
+
+    // Fill up host1 with tasks. Leave host2 empty.
+    try {
+      long startTime = tsWrapper.getClock().getTime();
+      tsWrapper.controlScheduler(true);
+      Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Mark a task as failed due to a comm failure.
+      tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR);
+
+      // Node1 marked as failed, node2 has capacity.
+      // Timeout for nodes is larger than delay - immediate allocation
+      tsWrapper.awaitChangeInTotalAllocations(2);
+
+      long thirdAllocateTime = tsWrapper.getClock().getTime();
+      long diff = thirdAllocateTime - startTime;
+      // diffAfterSleep < total sleepTime
+      assertTrue("Task not allocated in expected time window: duration=" + diff, diff < 10000l);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      ArgumentCaptor<Container> containerCaptor = ArgumentCaptor.forClass(Container.class);
+      verify(tsWrapper.mockAppCallback, times(1))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), containerCaptor.capture());
+      assertEquals(1, argumentCaptor.getAllValues().size());
+      assertEquals(task3, argumentCaptor.getAllValues().get(0));
+      Container assignedContainer = containerCaptor.getValue();
+      assertEquals(HOST2, assignedContainer.getNodeId().getHost());
+
+
+      assertEquals(2, tsWrapper.ts.dagStats.numLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numNonLocalAllocations);
+      assertEquals(1, tsWrapper.ts.dagStats.numDelayedAllocations);
+      assertEquals(2, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST1).get());
+      assertEquals(1, tsWrapper.ts.dagStats.numAllocationsPerHost.get(HOST2).get());
+
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
+  @Test (timeout = 15000)
+  public void testDelayedLocalityNodeCommErrorDelayedAllocation() throws IOException, InterruptedException {
+    Priority priority1 = Priority.newInstance(1);
+    String [] hosts = new String[] {HOST1, HOST2};
+
+    String [] hostsH1 = new String[] {HOST1};
+
+    TestTaskSchedulerServiceWrapper tsWrapper =
+        new TestTaskSchedulerServiceWrapper(5000, hosts, 1, 1, 10000l, true);
+    LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled
+        delayedTaskSchedulerCallableControlled =
+        (LlapTaskSchedulerServiceForTestControlled.DelayedTaskSchedulerCallableControlled) tsWrapper.ts.delayedTaskSchedulerCallable;
+    ControlledClock clock = tsWrapper.getClock();
+    clock.setTime(clock.getTime());
+
+    // Fill up host1 with tasks. Leave host2 empty.
+    try {
+      tsWrapper.controlScheduler(true);
+      Object task1 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task2 = tsWrapper.allocateTask(hostsH1, priority1);
+      Object task3 = tsWrapper.allocateTask(hostsH1, priority1); // 1 more than capacity.
+
+      tsWrapper.awaitLocalTaskAllocations(2);
+
+      verify(tsWrapper.mockAppCallback, never()).preemptContainer(any(ContainerId.class));
+      ArgumentCaptor<Object> argumentCaptor = ArgumentCaptor.forClass(Object.class);
+      verify(tsWrapper.mockAppCallback, times(2))
+          .taskAllocated(argumentCaptor.capture(), any(Object.class), any(Container.class));
+      assertEquals(2, argumentCaptor.getAllValues().size());
+      assertEquals(task1, argumentCaptor.getAllValues().get(0));
+      assertEquals(task2, argumentCaptor.getAllValues().get(1));
+
+      reset(tsWrapper.mockAppCallback);
+
+      // Mark a task as failed due to a comm failure.
+      tsWrapper.deallocateTask(task1, false, TaskAttemptEndReason.COMMUNICATION_ERROR);
+
+      // Node1 has free capacity but is disabled. Node 2 has capcaity. Delay > re-enable tiemout
+      tsWrapper.ensureNoChangeInTotalAllocations(2, 2000l);
+    } finally {
+      tsWrapper.shutdown();
+    }
+  }
+
   private static class TestTaskSchedulerServiceWrapper {
     static final Resource resource = Resource.newInstance(1024, 1);
     Configuration conf;
     TaskSchedulerContext mockAppCallback = mock(TaskSchedulerContext.class);
-    ControlledClock clock = new ControlledClock(new SystemClock());
+    ControlledClock clock = new ControlledClock(new MonotonicClock());
     ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(1000, 1), 1);
     LlapTaskSchedulerServiceForTest ts;
 
@@ -555,14 +1004,21 @@ public class TestLlapTaskSchedulerService {
       this(disableTimeoutMillis, hosts, numExecutors, waitQueueSize, 0l);
     }
 
-    TestTaskSchedulerServiceWrapper(long disableTimeoutMillis, String[] hosts, int numExecutors, int waitQueueSize, long localityDelayMs) throws
+    TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
+                                    int waitQueueSize, long localityDelayMs) throws
+        IOException, InterruptedException {
+      this(nodeDisableTimeoutMillis, hosts, numExecutors, waitQueueSize, localityDelayMs, false);
+    }
+
+    TestTaskSchedulerServiceWrapper(long nodeDisableTimeoutMillis, String[] hosts, int numExecutors,
+                                    int waitQueueSize, long localityDelayMs, boolean controlledDelayedTaskQueue) throws
         IOException, InterruptedException {
       conf = new Configuration();
       conf.setStrings(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, hosts);
       conf.setInt(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, numExecutors);
       conf.setInt(ConfVars.LLAP_DAEMON_TASK_SCHEDULER_WAIT_QUEUE_SIZE.varname, waitQueueSize);
       conf.set(ConfVars.LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS.varname,
-          disableTimeoutMillis + "ms");
+          nodeDisableTimeoutMillis + "ms");
       conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false);
       conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs);
 
@@ -571,7 +1027,11 @@ public class TestLlapTaskSchedulerService {
       UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
       doReturn(userPayload).when(mockAppCallback).getInitialUserPayload();
 
-      ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
+      if (controlledDelayedTaskQueue) {
+        ts = new LlapTaskSchedulerServiceForTestControlled(mockAppCallback, clock);
+      } else {
+        ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock);
+      }
 
       controlScheduler(true);
       ts.initialize();
@@ -582,6 +1042,10 @@ public class TestLlapTaskSchedulerService {
       awaitSchedulerRun();
     }
 
+    ControlledClock getClock() {
+      return clock;
+    }
+
     void controlScheduler(boolean val) {
       ts.forTestsetControlScheduling(val);
     }
@@ -591,8 +1055,19 @@ public class TestLlapTaskSchedulerService {
     }
 
     void awaitSchedulerRun() throws InterruptedException {
-      ts.forTestAwaitSchedulingRun();
+      ts.forTestAwaitSchedulingRun(-1);
+    }
+
+    /**
+     *
+     * @param timeoutMs
+     * @return false if the time elapsed
+     * @throws InterruptedException
+     */
+    boolean awaitSchedulerRun(long timeoutMs) throws InterruptedException {
+      return ts.forTestAwaitSchedulingRun(timeoutMs);
     }
+
     void resetAppCallback() {
       reset(mockAppCallback);
     }
@@ -605,6 +1080,8 @@ public class TestLlapTaskSchedulerService {
       ts.allocateTask(task, resource, hosts, null, priority, null, clientCookie);
     }
 
+
+
     void deallocateTask(Object task, boolean succeeded, TaskAttemptEndReason endReason) {
       ts.deallocateTask(task, succeeded, endReason, null);
     }
@@ -612,6 +1089,60 @@ public class TestLlapTaskSchedulerService {
     void rejectExecution(Object task) {
       ts.deallocateTask(task, false, TaskAttemptEndReason.EXECUTOR_BUSY, null);
     }
+
+
+    // More complex methods which may wrap multiple operations
+    Object allocateTask(String[] hosts, Priority priority) {
+      Object task = new Object();
+      Object clientCookie = new Object();
+      allocateTask(task, hosts, priority, clientCookie);
+      return task;
+    }
+
+    public void awaitTotalTaskAllocations(int numTasks) throws InterruptedException {
+      while (true) {
+        signalSchedulerRun();
+        awaitSchedulerRun();
+        if (ts.dagStats.numTotalAllocations == numTasks) {
+          break;
+        }
+      }
+    }
+
+    public void awaitLocalTaskAllocations(int numTasks) throws InterruptedException {
+      while (true) {
+        signalSchedulerRun();
+        awaitSchedulerRun();
+        if (ts.dagStats.numLocalAllocations == numTasks) {
+          break;
+        }
+      }
+    }
+
+    public void awaitChangeInTotalAllocations(int previousAllocations) throws InterruptedException {
+      while (true) {
+        signalSchedulerRun();
+        awaitSchedulerRun();
+        if (ts.dagStats.numTotalAllocations > previousAllocations) {
+          break;
+        }
+        Thread.sleep(200l);
+      }
+    }
+
+    public void ensureNoChangeInTotalAllocations(int previousAllocations, long timeout) throws
+        InterruptedException {
+      long startTime = Time.monotonicNow();
+      long timeLeft = timeout;
+      while (timeLeft > 0) {
+        signalSchedulerRun();
+        awaitSchedulerRun(Math.min(200, timeLeft));
+        if (ts.dagStats.numTotalAllocations != previousAllocations) {
+          throw new IllegalStateException("NumTotalAllocations expected to stay at " + previousAllocations + ". Actual=" + ts.dagStats.numTotalAllocations);
+        }
+        timeLeft = (startTime + timeout) - Time.monotonicNow();
+      }
+    }
   }
 
   private static class LlapTaskSchedulerServiceForTest extends LlapTaskSchedulerService {
@@ -632,6 +1163,7 @@ public class TestLlapTaskSchedulerService {
 
     @Override
     protected void schedulePendingTasks() {
+      LOG.info("Attempted schedulPendingTasks");
       testLock.lock();
       try {
         if (controlScheduling.get()) {
@@ -668,17 +1200,143 @@ public class TestLlapTaskSchedulerService {
       }
     }
 
-    void forTestAwaitSchedulingRun() throws InterruptedException {
+    boolean forTestAwaitSchedulingRun(long timeout) throws InterruptedException {
       testLock.lock();
       try {
+        boolean success = true;
         while (!schedulingComplete) {
-          schedulingCompleteCondition.await();
+          if (timeout == -1) {
+            schedulingCompleteCondition.await();
+          } else {
+            success = schedulingCompleteCondition.await(timeout, TimeUnit.MILLISECONDS);
+            break;
+          }
         }
         schedulingComplete = false;
+        return success;
       } finally {
         testLock.unlock();
       }
     }
 
   }
+
+  private static class LlapTaskSchedulerServiceForTestControlled extends LlapTaskSchedulerServiceForTest {
+
+    private DelayedTaskSchedulerCallableControlled controlledTSCallable;
+
+    public LlapTaskSchedulerServiceForTestControlled(
+        TaskSchedulerContext appClient, Clock clock) {
+      super(appClient, clock);
+    }
+
+    @Override
+    LlapTaskSchedulerService.DelayedTaskSchedulerCallable createDelayedTaskSchedulerCallable() {
+      controlledTSCallable = new DelayedTaskSchedulerCallableControlled();
+      return controlledTSCallable;
+    }
+
+    class DelayedTaskSchedulerCallableControlled extends DelayedTaskSchedulerCallable {
+      private final ReentrantLock lock = new ReentrantLock();
+      private final Condition triggerRunCondition = lock.newCondition();
+      private boolean shouldRun = false;
+      private final Condition runCompleteCondition = lock.newCondition();
+      private boolean runComplete = false;
+
+      static final int STATE_NOT_RUN = 0;
+      static final int STATE_NULL_FOUND = 1;
+      static final int STATE_TIMEOUT_NOT_EXPIRED = 2;
+      static final int STATE_RETURNED_TASK = 3;
+
+      volatile int lastState = STATE_NOT_RUN;
+
+      volatile boolean lastShouldScheduleTaskResult = false;
+      volatile boolean shouldScheduleTaskTriggered = false;
+
+      @Override
+      public void processEvictedTask(TaskInfo taskInfo) {
+        super.processEvictedTask(taskInfo);
+        signalRunComplete();
+      }
+
+      @Override
+      public TaskInfo getNextTask() throws InterruptedException {
+
+        while (true) {
+          lock.lock();
+          try {
+            while (!shouldRun) {
+              triggerRunCondition.await();
+            }
+            // Preven subsequent runs until a new trigger is set.
+            shouldRun = false;
+          } finally {
+            lock.unlock();
+          }
+          TaskInfo taskInfo = delayedTaskQueue.peek();
+          if (taskInfo == null) {
+            LOG.info("Triggered getTask but the queue is empty");
+            lastState = STATE_NULL_FOUND;
+            signalRunComplete();
+            continue;
+          }
+          if (taskInfo.shouldDelayForLocality(
+              LlapTaskSchedulerServiceForTestControlled.this.clock.getTime())) {
+            LOG.info("Triggered getTask but the first element is not ready to execute");
+            lastState = STATE_TIMEOUT_NOT_EXPIRED;
+            signalRunComplete();
+            continue;
+          } else {
+            delayedTaskQueue.poll(); // Remove the previously peeked element.
+            lastState = STATE_RETURNED_TASK;
+            return taskInfo;
+          }
+        }
+      }
+
+      @Override
+      public boolean shouldScheduleTask(TaskInfo taskInfo) {
+        shouldScheduleTaskTriggered = true;
+        lastShouldScheduleTaskResult = super.shouldScheduleTask(taskInfo);
+        return lastShouldScheduleTaskResult;
+      }
+
+      void resetShouldScheduleInformation() {
+        shouldScheduleTaskTriggered = false;
+        lastShouldScheduleTaskResult = false;
+      }
+
+      private void signalRunComplete() {
+        lock.lock();
+        try {
+          runComplete = true;
+          runCompleteCondition.signal();
+        } finally {
+          lock.unlock();
+        }
+      }
+
+      void triggerGetNextTask() {
+        lock.lock();
+        try {
+          shouldRun = true;
+          triggerRunCondition.signal();
+        } finally {
+          lock.unlock();
+        }
+      }
+
+      void awaitGetNextTaskProcessing() throws InterruptedException {
+        lock.lock();
+        try {
+          while (!runComplete) {
+            runCompleteCondition.await();
+          }
+          runComplete = false;
+        } finally {
+          lock.unlock();
+        }
+      }
+    }
+  }
 }


[06/11] hive git commit: HIVE-13447 : LLAP: check ZK acls for registry and fail if they are too permissive (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Posted by jd...@apache.org.
HIVE-13447 : LLAP: check ZK acls for registry and fail if they are too permissive (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1289aff1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1289aff1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1289aff1

Branch: refs/heads/llap
Commit: 1289aff1adbff9f35032d4a6ed074207fe1eb4de
Parents: 134b6cc
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 29 10:55:48 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 29 10:55:48 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  3 +
 .../impl/LlapZookeeperRegistryImpl.java         | 74 +++++++++++++++-----
 .../hadoop/hive/ql/exec/tez/DagUtils.java       |  1 -
 3 files changed, 60 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/1289aff1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index fd725cb..db4b9e8 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2803,6 +2803,9 @@ public class HiveConf extends Configuration {
         false,
         "Whether to setup split locations to match nodes on which llap daemons are running," +
             " instead of using the locations provided by the split itself"),
+    LLAP_VALIDATE_ACLS("hive.llap.validate.acls", true,
+        "Whether LLAP should reject permissive ACLs in some cases (e.g. its own management\n" +
+        "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."),
 
     SPARK_CLIENT_FUTURE_TIMEOUT("hive.spark.client.future.timeout",
       "60s", new TimeValidator(TimeUnit.SECONDS),

http://git-wip-us.apache.org/repos/asf/hive/blob/1289aff1/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index d51249a..6981061 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -70,10 +70,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class LlapZookeeperRegistryImpl implements ServiceRegistry {
@@ -88,10 +90,13 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
   private static final String IPC_SHUFFLE = "shuffle";
   private static final String IPC_LLAP = "llap";
   private final static String ROOT_NAMESPACE = "llap";
+  private final static String USER_SCOPE_PATH_PREFIX = "user-";
 
   private final Configuration conf;
   private final CuratorFramework zooKeeperClient;
-  private final String pathPrefix;
+  private final String pathPrefix, userPathPrefix;
+  private String userNameFromPrincipal; // Only set when setting up the secure config for ZK.
+
   private PersistentEphemeralNode znode;
   private String znodePath; // unique identity for this instance
   private final ServiceRecordMarshal encoder; // to marshal/unmarshal znode data
@@ -125,23 +130,23 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
 
     @Override
     public List<ACL> getDefaultAcl() {
-      List<ACL> nodeAcls = new ArrayList<ACL>();
-      if (UserGroupInformation.isSecurityEnabled()) {
-        // Read all to the world
-        nodeAcls.addAll(ZooDefs.Ids.READ_ACL_UNSAFE);
-        // Create/Delete/Write/Admin to the authenticated user
-        nodeAcls.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
-      } else {
-        // ACLs for znodes on a non-kerberized cluster
-        // Create/Read/Delete/Write/Admin to the world
-        nodeAcls.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE);
-      }
-      return nodeAcls;
+      // We always return something from getAclForPath so this should not happen.
+      LOG.warn("getDefaultAcl was called");
+      return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
     }
 
     @Override
     public List<ACL> getAclForPath(String path) {
-      return getDefaultAcl();
+      if (!UserGroupInformation.isSecurityEnabled() || path == null
+          || !path.contains(userPathPrefix)) {
+        // No security or the path is below the user path - full access.
+        return Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
+      }
+      // Read all to the world
+      List<ACL> nodeAcls = new ArrayList<ACL>(ZooDefs.Ids.READ_ACL_UNSAFE);
+      // Create/Delete/Write/Admin to creator
+      nodeAcls.addAll(ZooDefs.Ids.CREATOR_ALL_ACL);
+      return nodeAcls;
     }
   };
 
@@ -172,7 +177,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     // worker does not respond due to communication interruptions it will retain the same sequence
     // number when it returns back. If session timeout expires, the node will be deleted and new
     // addition of the same node (restart) will get next sequence number
-    this.pathPrefix = "/" + RegistryUtils.currentUser() + "/" + instanceName + "/workers/worker-";
+    this.userPathPrefix = USER_SCOPE_PATH_PREFIX + RegistryUtils.currentUser();
+    this.pathPrefix = "/" + userPathPrefix + "/" + instanceName + "/workers/worker-";
     this.instancesCache = null;
     this.instances = null;
     this.stateChangeListeners = new HashSet<>();
@@ -276,9 +282,11 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
       }
 
       znodePath = znode.getActualPath();
+      if (HiveConf.getBoolVar(conf, ConfVars.LLAP_VALIDATE_ACLS)) {
+        checkAcls();
+      }
       // Set a watch on the znode
-      if (zooKeeperClient.checkExists()
-          .forPath(znodePath) == null) {
+      if (zooKeeperClient.checkExists().forPath(znodePath) == null) {
         // No node exists, throw exception
         throw new Exception("Unable to create znode for this LLAP instance on ZooKeeper.");
       }
@@ -297,6 +305,31 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     return uniq.toString();
   }
 
+  private void checkAcls() throws Exception {
+    if (!UserGroupInformation.isSecurityEnabled()) return;
+    String pathToCheck = znodePath;
+    // We are trying to check ACLs on the "workers" directory, which noone except us should be
+    // able to write to. Higher-level directories shouldn't matter - we don't read them.
+    int ix = pathToCheck.lastIndexOf('/');
+    if (ix > 0) {
+      pathToCheck = pathToCheck.substring(0, ix);
+    }
+    List<ACL> acls = zooKeeperClient.usingNamespace(null).getACL().forPath(pathToCheck);
+    if (acls == null || acls.isEmpty()) {
+      // Can there be no ACLs? There's some access (to get ACLs), so assume it means free for all.
+      throw new SecurityException("No ACLs on "  + pathToCheck);
+    }
+    // This could be brittle.
+    assert userNameFromPrincipal != null;
+    Id currentUser = new Id("sasl", userNameFromPrincipal);
+    for (ACL acl : acls) {
+      if ((acl.getPerms() & ~ZooDefs.Perms.READ) == 0 || currentUser.equals(acl.getId())) {
+        continue; // Read permission/no permissions, or the expected user.
+      }
+      throw new SecurityException("The ACL " + acl + " is unnacceptable for " + pathToCheck);
+    }
+  }
+
   @Override
   public void unregister() throws IOException {
     // Nothing for the zkCreate models
@@ -643,6 +676,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, SASL_LOGIN_CONTEXT_NAME);
 
     principal = SecurityUtil.getServerPrincipal(principal, "0.0.0.0");
+    userNameFromPrincipal = getUserNameFromPrincipal(principal);
     JaasConfiguration jaasConf = new JaasConfiguration(SASL_LOGIN_CONTEXT_NAME, principal,
         keyTabFile);
 
@@ -650,6 +684,12 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
     javax.security.auth.login.Configuration.setConfiguration(jaasConf);
   }
 
+  private String getUserNameFromPrincipal(String principal) {
+    // Based on SecurityUtil.
+    String[] components = principal.split("[/@]");
+    return (components == null || components.length != 3) ? principal : components[0];
+  }
+
   /**
    * A JAAS configuration for ZooKeeper clients intended to use for SASL
    * Kerberos.

http://git-wip-us.apache.org/repos/asf/hive/blob/1289aff1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 8aca779..79da860 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -577,7 +577,6 @@ public class DagUtils {
       }
     }
 
-    // TODO# HERE?
     if (mapWork instanceof MergeFileWork) {
       Path outputPath = ((MergeFileWork) mapWork).getOutputDir();
       // prepare the tmp output directory. The output tmp directory should


[09/11] hive git commit: HIVE-13654: Add JAVA8_URL to jenkins-submit-build.sh

Posted by jd...@apache.org.
HIVE-13654: Add JAVA8_URL to jenkins-submit-build.sh


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9179178e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9179178e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9179178e

Branch: refs/heads/llap
Commit: 9179178e46420cb829f489a8ba59733554d8de4c
Parents: 324a2c6
Author: Sergio Pena <se...@cloudera.com>
Authored: Fri Apr 29 16:07:55 2016 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Fri Apr 29 16:07:55 2016 -0500

----------------------------------------------------------------------
 dev-support/jenkins-common.sh       | 14 ++++++++++++++
 dev-support/jenkins-submit-build.sh |  4 ++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9179178e/dev-support/jenkins-common.sh
----------------------------------------------------------------------
diff --git a/dev-support/jenkins-common.sh b/dev-support/jenkins-common.sh
index 473fd16..dc98db5 100644
--- a/dev-support/jenkins-common.sh
+++ b/dev-support/jenkins-common.sh
@@ -101,3 +101,17 @@ process_jira() {
 patch_contains_hms_upgrade() {
 	curl -s "$1" | grep "^diff.*metastore/scripts/upgrade/" >/dev/null
 }
+
+string_to_upper_case() {
+  local str="$1"
+
+  echo "$str" | tr '[:lower:]' '[:upper:]'
+}
+
+get_jenkins_job_url() {
+  local branch="$1"
+  local varname=`string_to_upper_case $branch`_URL
+  local joburl=`eval echo \\$${varname}`
+
+  echo $joburl
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9179178e/dev-support/jenkins-submit-build.sh
----------------------------------------------------------------------
diff --git a/dev-support/jenkins-submit-build.sh b/dev-support/jenkins-submit-build.sh
index a145e00..d055c38 100644
--- a/dev-support/jenkins-submit-build.sh
+++ b/dev-support/jenkins-submit-build.sh
@@ -48,6 +48,10 @@ case "$BUILD_PROFILE" in
    test -n "$BEELINE_CLI_URL" || fail "BEELINE_CLI_URL must be specified"
    url="$BEELINE_CLI_URL&ISSUE_NUM=$ISSUE_NUM"
   ;;
+  java8-mr2)
+   test -n "$JAVA8_URL" || fail "JAVA8_URL must be specified"
+   url="$JAVA8_URL&ISSUE_NUM=$ISSUE_NUM"
+  ;;
   *)
   echo "Unknown profile '$BUILD_PROFILE'"
   exit 1