You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/08/22 13:04:41 UTC

svn commit: r1619739 - in /hive/branches/tez: itests/util/src/main/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/

Author: gunther
Date: Fri Aug 22 11:04:41 2014
New Revision: 1619739

URL: http://svn.apache.org/r1619739
Log:
HIVE-7891: Fix NPE in split generation on Tez 0.5 (Gunther Hagleitner)

Modified:
    hive/branches/tez/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java

Modified: hive/branches/tez/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1619739&r1=1619738&r2=1619739&view=diff
==============================================================================
--- hive/branches/tez/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/branches/tez/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Fri Aug 22 11:04:41 2014
@@ -43,7 +43,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Deque;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -64,10 +63,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hive.cli.CliDriver;
 import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.io.CachingPrintStream;
 import org.apache.hadoop.hive.common.io.DigestPrintStream;
 import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream;
 import org.apache.hadoop.hive.common.io.SortPrintStream;
-import org.apache.hadoop.hive.common.io.CachingPrintStream;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -75,8 +74,6 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.vector.util.AllVectorTypesRecord;
-import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -87,22 +84,14 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer;
-import org.apache.hadoop.hive.serde2.thrift.test.Complex;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.Shell;
 import org.apache.hive.common.util.StreamPrinter;
-import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.tools.ant.BuildException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assume;
 
 import com.google.common.collect.ImmutableList;
 
@@ -145,8 +134,8 @@ public class QTestUtil {
   private QTestSetup setup = null;
   private boolean isSessionStateStarted = false;
 
-  private String initScript;
-  private String cleanupScript;
+  private final String initScript;
+  private final String cleanupScript;
 
   static {
     for (String srcTable : System.getProperty("test.src.tables", "").trim().split(",")) {
@@ -332,14 +321,6 @@ public class QTestUtil {
     HadoopShims shims = ShimLoader.getHadoopShims();
     int numberOfDataNodes = 4;
 
-    // can run tez tests only on hadoop 2
-    if (clusterType == MiniClusterType.tez) {
-      Assume.assumeTrue(ShimLoader.getMajorVersion().equals("0.23"));
-      // this is necessary temporarily - there's a probem with multi datanodes on MiniTezCluster
-      // will be fixed in 0.3
-      numberOfDataNodes = 1;
-    }
-
     if (clusterType != MiniClusterType.none) {
       dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null);
       FileSystem fs = dfs.getFileSystem();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1619739&r1=1619738&r2=1619739&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Aug 22 11:04:41 2014
@@ -346,7 +346,7 @@ public class DagUtils {
 
   /**
    * Utility method to create a stripped down configuration for the MR partitioner.
-   * 
+   *
    * @param partitionerClassName
    *          the real MR partitioner class name
    * @param baseConf
@@ -427,7 +427,7 @@ public class DagUtils {
 
     // use tez to combine splits
     boolean groupSplitsInInputInitializer;
-    
+
     DataSourceDescriptor dataSource;
 
     int numTasks = -1;
@@ -462,11 +462,12 @@ public class DagUtils {
       }
     }
 
-    // set up the operator plan. Before setting up Inputs since the config is updated.
-    Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
-    
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
         && !mapWork.isUseOneNullRowInputFormat()) {
+
+      // set up the operator plan. (before setting up splits on the AM)
+      Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
+
       // if we're generating the splits in the AM, we just need to set
       // the correct plugin.
       if (groupSplitsInInputInitializer) {
@@ -484,6 +485,9 @@ public class DagUtils {
       dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir,
           "split_" + mapWork.getName().replaceAll(" ", "_")), true);
       numTasks = dataSource.getNumberOfShards();
+
+      // set up the operator plan. (after generating splits - that changes configs)
+      Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
     }
 
     UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1619739&r1=1619738&r2=1619739&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Fri Aug 22 11:04:41 2014
@@ -64,6 +64,23 @@ public class MapRecordProcessor extends 
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
   private MapWork mapWork;
 
+  public MapRecordProcessor(JobConf jconf) {
+    ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+    execContext.setJc(jconf);
+    // create map and fetch operators
+    mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+    if (mapWork == null) {
+      mapWork = Utilities.getMapWork(jconf);
+      cache.cache(MAP_PLAN_KEY, mapWork);
+      l4j.info("Plan: "+mapWork);
+      for (String s: mapWork.getAliases()) {
+        l4j.info("Alias: "+s);
+      }
+    } else {
+      Utilities.setMapWork(jconf, mapWork);
+    }
+  }
+
   @Override
   void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
@@ -87,22 +104,7 @@ public class MapRecordProcessor extends 
       ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
     }
 
-    ObjectCache cache = ObjectCacheFactory.getCache(jconf);
     try {
-
-      execContext.setJc(jconf);
-      // create map and fetch operators
-      mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
-      if (mapWork == null) {
-        mapWork = Utilities.getMapWork(jconf);
-        cache.cache(MAP_PLAN_KEY, mapWork);
-        l4j.info("Plan: "+mapWork);
-        for (String s: mapWork.getAliases()) {
-          l4j.info("Alias: "+s);
-        }
-      } else {
-        Utilities.setMapWork(jconf, mapWork);
-      }
       if (mapWork.getVectorMode()) {
         mapOp = new VectorMapOperator();
       } else {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1619739&r1=1619738&r2=1619739&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Fri Aug 22 11:04:41 2014
@@ -130,7 +130,7 @@ public class TezProcessor extends Abstra
       LOG.info("Running task: " + getContext().getUniqueIdentifier());
 
       if (isMap) {
-        rproc = new MapRecordProcessor();
+        rproc = new MapRecordProcessor(jobConf);
         MRInputLegacy mrInput = getMRInput(inputs);
         try {
           mrInput.init();
@@ -201,6 +201,7 @@ public class TezProcessor extends Abstra
       this.writer = (KeyValueWriter) output.getWriter();
     }
 
+    @Override
     public void collect(Object key, Object value) throws IOException {
       writer.write(key, value);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1619739&r1=1619738&r2=1619739&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Fri Aug 22 11:04:41 2014
@@ -134,7 +134,7 @@ public class TezTask extends Task<TezWor
       }
 
       List<LocalResource> additionalLr = session.getLocalizedResources();
-      
+
       // log which resources we're adding (apart from the hive exec)
       if (LOG.isDebugEnabled()) {
         if (additionalLr == null || additionalLr.size() == 0) {
@@ -165,7 +165,7 @@ public class TezTask extends Task<TezWor
       counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
       TezSessionPoolManager.getInstance().returnSession(session);
 
-      if (LOG.isInfoEnabled()) {
+      if (LOG.isInfoEnabled() && counters != null) {
         for (CounterGroup group: counters) {
           LOG.info(group.getDisplayName() +":");
           for (TezCounter counter: group) {