You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/02/14 11:40:20 UTC

svn commit: r1568231 - in /hive/branches/tez: common/src/java/org/apache/hadoop/hive/conf/ conf/ data/conf/tez/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ ql/src/test/org/apache/hadoop/hive/ql/session/

Author: gunther
Date: Fri Feb 14 10:40:19 2014
New Revision: 1568231

URL: http://svn.apache.org/r1568231
Log:
HIVE-6391: Use pre-warm APIs in Tez to improve hive query startup (Gopal V via Gunther Hagleitner)

Added:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
Modified:
    hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/tez/conf/hive-default.xml.template
    hive/branches/tez/data/conf/tez/hive-site.xml
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java

Modified: hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1568231&r1=1568230&r2=1568231&view=diff
==============================================================================
--- hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Feb 14 10:40:19 2014
@@ -886,6 +886,9 @@ public class HiveConf extends Configurat
     // Whether to generate the splits locally or in the AM (tez only)
     HIVE_AM_SPLIT_GENERATION("hive.compute.splits.in.am", true),
 
+    HIVE_PREWARM_ENABLED("hive.prewarm.enabled", false),
+    HIVE_PREWARM_NUM_CONTAINERS("hive.prewarm.numcontainers", 10),
+
     // none, idonly, traverse, execution
     HIVESTAGEIDREARRANGE("hive.stageid.rearrange", "none"),
     HIVEEXPLAINDEPENDENCYAPPENDTASKTYPES("hive.explain.dependency.append.tasktype", false),

Modified: hive/branches/tez/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/branches/tez/conf/hive-default.xml.template?rev=1568231&r1=1568230&r2=1568231&view=diff
==============================================================================
--- hive/branches/tez/conf/hive-default.xml.template (original)
+++ hive/branches/tez/conf/hive-default.xml.template Fri Feb 14 10:40:19 2014
@@ -2168,6 +2168,22 @@
 </property>
 
 <property>
+  <name>hive.prewarm.enabled</name>
+  <value>false</value>
+  <description>
+    Enables container prewarm for tez (hadoop 2 only)
+  </description>
+</property>
+
+<property>
+  <name>hive.prewarm.numcontainers</name>
+  <value>10</value>
+  <description>
+    Controls the number of containers to prewarm for tez (hadoop 2 only)
+  </description>
+</property>
+
+<property>
   <name>hive.server2.table.type.mapping</name>
   <value>CLASSIC</value>
   <description>

Modified: hive/branches/tez/data/conf/tez/hive-site.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/data/conf/tez/hive-site.xml?rev=1568231&r1=1568230&r2=1568231&view=diff
==============================================================================
Binary files - no diff available.

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1568231&r1=1568230&r2=1568231&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Feb 14 10:40:19 2014
@@ -86,6 +86,10 @@ import org.apache.tez.dag.api.InputDescr
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.client.PreWarmContext;
+import org.apache.tez.client.TezSessionConfiguration;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -407,6 +411,49 @@ public class DagUtils {
   }
 
   /**
+   * @param sessionConfig session configuration
+   * @param numContainers number of containers to pre-warm
+   * @param localResources additional resources to pre-warm with
+   * @return prewarm context object
+   */
+  public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig, int numContainers,
+               Map<String, LocalResource> localResources) throws IOException, TezException {
+
+    Configuration conf = sessionConfig.getTezConfiguration();
+
+    ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
+    prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
+
+    PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, MRHelpers.getMapResource(conf),
+        new VertexLocationHint(numContainers, null));
+
+    Map<String, LocalResource> combinedResources = new HashMap<String, LocalResource>();
+
+    combinedResources.putAll(sessionConfig.getSessionResources());
+
+    try {
+      for(LocalResource lr : localizeTempFiles(conf)) {
+        combinedResources.put(getBaseName(lr), lr);
+      }
+    } catch(LoginException le) {
+      throw new IOException(le);
+    }
+
+    if(localResources != null) {
+       combinedResources.putAll(localResources);
+    }
+
+    context.setLocalResources(combinedResources);
+
+    /* boiler plate task env */
+    Map<String, String> environment = new HashMap<String, String>();
+    MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
+    context.setEnvironment(environment);
+    context.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
+    return context;
+  }
+
+  /**
    * @param conf
    * @return path to destination directory on hdfs
    * @throws LoginException if we are unable to figure user information

Added: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java?rev=1568231&view=auto
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java (added)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HivePreWarmProcessor.java Fri Feb 14 10:40:19 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+import java.net.URL;
+import java.net.JarURLConnection;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Map;
+import java.util.jar.JarFile;
+import java.util.jar.JarEntry;
+
+import javax.crypto.Mac;
+
+/**
+ * A simple sleep processor implementation that sleeps for the configured
+ * time in milliseconds.
+ *
+ * @see Config for configuring the HivePreWarmProcessor
+ */
+public class HivePreWarmProcessor implements LogicalIOProcessor {
+
+  private static boolean prewarmed = false;
+
+  private static final Log LOG = LogFactory.getLog(HivePreWarmProcessor.class);
+
+  private Configuration conf;
+
+  @Override
+  public void initialize(TezProcessorContext processorContext)
+    throws Exception {
+    byte[] userPayload = processorContext.getUserPayload();
+    this.conf = TezUtils.createConfFromUserPayload(userPayload);
+  }
+
+  @Override
+  public void run(Map<String, LogicalInput> inputs,
+                  Map<String, LogicalOutput> outputs) throws Exception {
+    if(prewarmed) {
+      /* container reuse */
+      return;
+    }
+    /* these are things that goes through singleton initialization on most queries */
+    FileSystem fs = FileSystem.get(conf);
+    Mac mac = Mac.getInstance("HmacSHA1");
+    ReadaheadPool rpool = ReadaheadPool.getInstance();
+    ShimLoader.getHadoopShims();
+
+    URL hiveurl = new URL("jar:"+DagUtils.getInstance().getExecJarPathLocal()+"!/");
+    JarURLConnection hiveconn = (JarURLConnection)hiveurl.openConnection();
+    JarFile hivejar = hiveconn.getJarFile();
+    try {
+      Enumeration<JarEntry> classes = hivejar.entries();
+      while(classes.hasMoreElements()) {
+        JarEntry je = classes.nextElement();
+        if (je.getName().endsWith(".class")) {
+          String klass = je.getName().replace(".class","").replaceAll("/","\\.");
+          if(klass.indexOf("ql.exec") != -1 || klass.indexOf("ql.io") != -1) {
+            /* several hive classes depend on the metastore APIs, which is not included
+             * in hive-exec.jar. These are the relatively safe ones - operators & io classes.
+             */
+            if(klass.indexOf("vector") != -1 || klass.indexOf("Operator") != -1) {
+              Class.forName(klass);
+            }
+          }
+        }
+      }
+    } finally {
+      hivejar.close();
+    }
+    prewarmed = true;
+  }
+
+  @Override
+  public void handleEvents(List<Event> processorEvents) {
+    // Nothing to do
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Nothing to cleanup
+  }
+}

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1568231&r1=1568230&r2=1568231&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Fri Feb 14 10:40:19 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.AMConfiguration;
@@ -43,7 +44,7 @@ import org.apache.tez.client.TezSessionC
 import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.client.PreWarmContext;
 
 /**
  * Holds session state related to Tez
@@ -134,8 +135,24 @@ public class TezSessionState {
     session = new TezSession("HIVE-"+sessionId, sessionConfig);
 
     LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")");
+
     session.start();
 
+    if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
+      int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
+      LOG.info("Prewarming " + n + " containers  (id: " + sessionId
+          + ", scratch dir: " + tezScratchDir + ")");
+      PreWarmContext context = utils.createPreWarmContext(sessionConfig, n,
+          commonLocalResources);
+      try {
+        session.preWarm(context);
+      } catch (InterruptedException ie) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Hive Prewarm threw an exception ", ie);
+        }
+      }
+    }
+
     // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
     // id is used for tez to reuse the current session rather than start a new one.
     conf.set("mapreduce.framework.name", "yarn-tez");

Modified: hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java?rev=1568231&r1=1568230&r2=1568231&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java (original)
+++ hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/session/TestSessionState.java Fri Feb 14 10:40:19 2014
@@ -20,20 +20,43 @@ package org.apache.hadoop.hive.ql.sessio
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
+import java.util.Arrays;
+import java.util.Collection;
+
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Test SessionState
  */
+@RunWith(value = Parameterized.class)
 public class TestSessionState {
 
+  private final boolean prewarm;
+
+  public TestSessionState(Boolean mode) {
+    this.prewarm = mode.booleanValue();
+  }
+
+  @Parameters
+  public static Collection<Boolean[]> data() {
+    return Arrays.asList(new Boolean[][] { {false}, {true}});
+  }
 
   @Before
-  public void setup(){
-    SessionState.start(new HiveConf());
+  public void setup() {
+    HiveConf conf = new HiveConf();
+    if (prewarm) {
+      HiveConf.setBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED, true);
+      HiveConf.setIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS, 1);
+    }
+    SessionState.start(conf);
   }
 
   /**