You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/14 23:24:13 UTC

svn commit: r1631891 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ itests/hive-unit-hadoop2/ itests/hive-unit-hadoop2/src/test/java/org/apache/hive/ itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/ service/src/java/org/apac...

Author: brock
Date: Tue Oct 14 21:24:12 2014
New Revision: 1631891

URL: http://svn.apache.org/r1631891
Log:
HIVE-8424 - Support fair scheduler user queue mapping in non-impersonation mode (Mohit Sabharwal via Brock)

Added:
    hive/trunk/itests/hive-unit-hadoop2/src/test/java/org/apache/hive/
    hive/trunk/itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/
    hive/trunk/itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/itests/hive-unit-hadoop2/pom.xml
    hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
    hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
    hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
    hive/trunk/shims/0.23/pom.xml
    hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
    hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1631891&r1=1631890&r2=1631891&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Oct 14 21:24:12 2014
@@ -1625,6 +1625,12 @@ public class HiveConf extends Configurat
     HIVE_SERVER2_USE_SSL("hive.server2.use.SSL", false, ""),
     HIVE_SERVER2_SSL_KEYSTORE_PATH("hive.server2.keystore.path", "", ""),
     HIVE_SERVER2_SSL_KEYSTORE_PASSWORD("hive.server2.keystore.password", "", ""),
+    HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE("hive.server2.map.fair.scheduler.queue", true,
+        "If the YARN fair scheduler is configured and HiveServer2 is running in non-impersonation mode,\n" +
+        "this setting determines the user for fair scheduler queue mapping.\n" +
+        "If set to true (default), the logged-in user determines the fair scheduler queue\n" +
+        "for submitted jobs, so that map reduce resource usage can be tracked by user.\n" +
+        "If set to false, all Hive jobs go to the 'hive' user's queue."),
 
     HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
         "Comma separated list of non-SQL Hive commands users are authorized to execute"),

Modified: hive/trunk/itests/hive-unit-hadoop2/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit-hadoop2/pom.xml?rev=1631891&r1=1631890&r2=1631891&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit-hadoop2/pom.xml (original)
+++ hive/trunk/itests/hive-unit-hadoop2/pom.xml Tue Oct 14 21:24:12 2014
@@ -118,6 +118,12 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-it-unit</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
     <!-- hadoop-2 dependencies -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>

Added: hive/trunk/itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java?rev=1631891&view=auto
==============================================================================
--- hive/trunk/itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java (added)
+++ hive/trunk/itests/hive-unit-hadoop2/src/test/java/org/apache/hive/jdbc/TestSchedulerQueue.java Tue Oct 14 21:24:12 2014
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.jdbc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSchedulerQueue {
+
+  private MiniHS2 miniHS2 = null;
+  private static HiveConf conf = new HiveConf();
+  private Connection hs2Conn = null;
+
+  @BeforeClass
+  public static void beforeTest() throws Exception {
+    Class.forName(MiniHS2.getJdbcDriverName());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    DriverManager.setLoginTimeout(0);
+    miniHS2 = new MiniHS2(conf, true);
+    miniHS2.setConfProperty(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
+    miniHS2.setConfProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname,
+        "true");
+    miniHS2.setConfProperty(YarnConfiguration.RM_SCHEDULER,
+        "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+    miniHS2.start(new HashMap<String, String>());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (hs2Conn != null) {
+      hs2Conn.close();
+    }
+    if (miniHS2 != null && miniHS2.isStarted()) {
+      miniHS2.stop();
+    }
+    System.clearProperty("mapreduce.job.queuename");
+  }
+
+  /**
+   * Verify:
+   *  Test is running with MR2 and queue mapping defaults are set.
+   *  Queue mapping is set for the connected user.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testFairSchedulerQueueMapping() throws Exception {
+    hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user1", "bar");
+    verifyProperty("mapreduce.framework.name", "yarn");
+    verifyProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname,
+        "true");
+    verifyProperty(YarnConfiguration.RM_SCHEDULER,
+        "org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler");
+    verifyProperty("mapreduce.job.queuename", "root.user1");
+  }
+
+  /**
+   * Verify that the queue refresh doesn't happen when configured to be off.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testQueueMappingCheckDisabled() throws Exception {
+    miniHS2.setConfProperty(
+        HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname, "false");
+    hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL(), "user1", "bar");
+    verifyProperty(HiveConf.ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE.varname,
+        "false");
+    verifyProperty("mapreduce.job.queuename", YarnConfiguration.DEFAULT_QUEUE_NAME);
+  }
+
+  /**
+   * Verify that the given property contains the expected value.
+   *
+   * @param propertyName
+   * @param expectedValue
+   * @throws Exception
+   */
+  private void verifyProperty(String propertyName, String expectedValue) throws Exception {
+    Statement stmt = hs2Conn .createStatement();
+    ResultSet res = stmt.executeQuery("set " + propertyName);
+    assertTrue(res.next());
+    String results[] = res.getString(1).split("=");
+    assertEquals("Property should be set", results.length, 2);
+    assertEquals("Property should be set", expectedValue, results[1]);
+  }
+}

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1631891&r1=1631890&r2=1631891&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Oct 14 21:24:12 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.Li
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.processors.SetProcessor;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.common.util.HiveVersionInfo;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.cli.*;
@@ -95,6 +96,17 @@ public class HiveSessionImpl implements 
     this.hiveConf = new HiveConf(serverhiveConf);
     this.ipAddress = ipAddress;
 
+    try {
+      // In non-impersonation mode, map scheduler queue to current user
+      // if fair scheduler is configured.
+      if (! hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS) &&
+        hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_MAP_FAIR_SCHEDULER_QUEUE)) {
+        ShimLoader.getHadoopShims().refreshDefaultQueue(hiveConf, username);
+      }
+    } catch (IOException e) {
+      LOG.warn("Error setting scheduler queue: " + e, e);
+    }
+
     // Set an explicit session name to control the download directory name
     hiveConf.set(ConfVars.HIVESESSIONID.varname,
         sessionHandle.getHandleIdentifier().toString());

Modified: hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1631891&r1=1631890&r2=1631891&view=diff
==============================================================================
--- hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Tue Oct 14 21:24:12 2014
@@ -825,6 +825,11 @@ public class Hadoop20Shims implements Ha
   }
 
   @Override
+  public void refreshDefaultQueue(Configuration conf, String userName) {
+    // MR1 does not expose API required to set MR queue mapping for user
+  }
+
+  @Override
   public String getTokenFileLocEnvName() {
     throw new UnsupportedOperationException(
         "Kerberos not supported in current hadoop version");

Modified: hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1631891&r1=1631890&r2=1631891&view=diff
==============================================================================
--- hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Tue Oct 14 21:24:12 2014
@@ -159,6 +159,11 @@ public class Hadoop20SShims extends Hado
   }
 
   @Override
+  public void refreshDefaultQueue(Configuration conf, String userName) {
+    // MR1 does not expose API required to set MR queue mapping for user
+  }
+
+  @Override
   public void setTotalOrderPartitionFile(JobConf jobConf, Path partitionFile){
     TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
   }

Modified: hive/trunk/shims/0.23/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/pom.xml?rev=1631891&r1=1631890&r2=1631891&view=diff
==============================================================================
--- hive/trunk/shims/0.23/pom.xml (original)
+++ hive/trunk/shims/0.23/pom.xml Tue Oct 14 21:24:12 2014
@@ -120,6 +120,11 @@
      <optional>true</optional>
    </dependency>
    <dependency>
+     <groupId>org.apache.hadoop</groupId>
+     <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+     <version>${hadoop-23.version}</version>
+   </dependency>
+   <dependency>
      <groupId>org.apache.tez</groupId>
      <artifactId>tez-tests</artifactId>
      <version>${tez.version}</version>

Modified: hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1631891&r1=1631890&r2=1631891&view=diff
==============================================================================
--- hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Tue Oct 14 21:24:12 2014
@@ -75,6 +75,10 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementPolicy;
 import org.apache.tez.test.MiniTezCluster;
 
 import com.google.common.base.Joiner;
@@ -86,6 +90,7 @@ import com.google.common.collect.Iterabl
  * Implemention of shims against Hadoop 0.23.0.
  */
 public class Hadoop23Shims extends HadoopShimsSecure {
+  private static final String MR2_JOB_QUEUE_PROPERTY = "mapreduce.job.queuename";
 
   HadoopShims.MiniDFSShim cluster = null;
 
@@ -221,6 +226,30 @@ public class Hadoop23Shims extends Hadoo
   }
 
   /**
+   * Load the fair scheduler queue for given user if available.
+   */
+  @Override
+  public void refreshDefaultQueue(Configuration conf, String userName) throws IOException {
+    String requestedQueue = YarnConfiguration.DEFAULT_QUEUE_NAME;
+    if (StringUtils.isNotBlank(userName) && isFairScheduler(conf)) {
+      AllocationConfiguration allocConf = new AllocationConfiguration(conf);
+      QueuePlacementPolicy queuePolicy = allocConf.getPlacementPolicy();
+      if (queuePolicy != null) {
+        requestedQueue = queuePolicy.assignAppToQueue(requestedQueue, userName);
+        if (StringUtils.isNotBlank(requestedQueue)) {
+          LOG.debug("Setting queue name to " + requestedQueue + " for user " + userName);
+          conf.set(MR2_JOB_QUEUE_PROPERTY, requestedQueue);
+        }
+      }
+    }
+  }
+
+  private boolean isFairScheduler (Configuration conf) {
+    return FairScheduler.class.getName().
+        equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER));
+  }
+
+  /**
    * Returns a shim to wrap MiniMrCluster
    */
   @Override

Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1631891&r1=1631890&r2=1631891&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Tue Oct 14 21:24:12 2014
@@ -366,6 +366,15 @@ public interface HadoopShims {
   public short getDefaultReplication(FileSystem fs, Path path);
 
   /**
+   * Reset the default fair scheduler queue mapping to end user.
+   *
+   * @param conf
+   * @param userName end user name
+   */
+  public void refreshDefaultQueue(Configuration conf, String userName)
+      throws IOException;
+
+  /**
    * Create the proxy ugi for the given userid
    * @param userName
    * @return