You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/08/22 13:04:41 UTC
svn commit: r1619739 - in /hive/branches/tez:
itests/util/src/main/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
Author: gunther
Date: Fri Aug 22 11:04:41 2014
New Revision: 1619739
URL: http://svn.apache.org/r1619739
Log:
HIVE-7891: Fix NPE in split generation on Tez 0.5 (Gunther Hagleitner)
Modified:
hive/branches/tez/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Modified: hive/branches/tez/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java?rev=1619739&r1=1619738&r2=1619739&view=diff
==============================================================================
--- hive/branches/tez/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java (original)
+++ hive/branches/tez/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java Fri Aug 22 11:04:41 2014
@@ -43,7 +43,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -64,10 +63,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hive.cli.CliDriver;
import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.io.CachingPrintStream;
import org.apache.hadoop.hive.common.io.DigestPrintStream;
import org.apache.hadoop.hive.common.io.SortAndDigestPrintStream;
import org.apache.hadoop.hive.common.io.SortPrintStream;
-import org.apache.hadoop.hive.common.io.CachingPrintStream;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -75,8 +74,6 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.vector.util.AllVectorTypesRecord;
-import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -87,22 +84,14 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.thrift.ThriftDeserializer;
-import org.apache.hadoop.hive.serde2.thrift.test.Complex;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.Shell;
import org.apache.hive.common.util.StreamPrinter;
-import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.tools.ant.BuildException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
-import org.junit.Assume;
import com.google.common.collect.ImmutableList;
@@ -145,8 +134,8 @@ public class QTestUtil {
private QTestSetup setup = null;
private boolean isSessionStateStarted = false;
- private String initScript;
- private String cleanupScript;
+ private final String initScript;
+ private final String cleanupScript;
static {
for (String srcTable : System.getProperty("test.src.tables", "").trim().split(",")) {
@@ -332,14 +321,6 @@ public class QTestUtil {
HadoopShims shims = ShimLoader.getHadoopShims();
int numberOfDataNodes = 4;
- // can run tez tests only on hadoop 2
- if (clusterType == MiniClusterType.tez) {
- Assume.assumeTrue(ShimLoader.getMajorVersion().equals("0.23"));
- // this is necessary temporarily - there's a probem with multi datanodes on MiniTezCluster
- // will be fixed in 0.3
- numberOfDataNodes = 1;
- }
-
if (clusterType != MiniClusterType.none) {
dfs = shims.getMiniDfs(conf, numberOfDataNodes, true, null);
FileSystem fs = dfs.getFileSystem();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1619739&r1=1619738&r2=1619739&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Aug 22 11:04:41 2014
@@ -346,7 +346,7 @@ public class DagUtils {
/**
* Utility method to create a stripped down configuration for the MR partitioner.
- *
+ *
* @param partitionerClassName
* the real MR partitioner class name
* @param baseConf
@@ -427,7 +427,7 @@ public class DagUtils {
// use tez to combine splits
boolean groupSplitsInInputInitializer;
-
+
DataSourceDescriptor dataSource;
int numTasks = -1;
@@ -462,11 +462,12 @@ public class DagUtils {
}
}
- // set up the operator plan. Before setting up Inputs since the config is updated.
- Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
-
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
&& !mapWork.isUseOneNullRowInputFormat()) {
+
+ // set up the operator plan. (before setting up splits on the AM)
+ Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
+
// if we're generating the splits in the AM, we just need to set
// the correct plugin.
if (groupSplitsInInputInitializer) {
@@ -484,6 +485,9 @@ public class DagUtils {
dataSource = MRInputHelpers.configureMRInputWithLegacySplitGeneration(conf, new Path(tezDir,
"split_" + mapWork.getName().replaceAll(" ", "_")), true);
numTasks = dataSource.getNumberOfShards();
+
+ // set up the operator plan. (after generating splits - that changes configs)
+ Utilities.setMapWork(conf, mapWork, mrScratchDir, false);
}
UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1619739&r1=1619738&r2=1619739&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Fri Aug 22 11:04:41 2014
@@ -64,6 +64,23 @@ public class MapRecordProcessor extends
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
private MapWork mapWork;
+ public MapRecordProcessor(JobConf jconf) {
+ ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+ execContext.setJc(jconf);
+ // create map and fetch operators
+ mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+ if (mapWork == null) {
+ mapWork = Utilities.getMapWork(jconf);
+ cache.cache(MAP_PLAN_KEY, mapWork);
+ l4j.info("Plan: "+mapWork);
+ for (String s: mapWork.getAliases()) {
+ l4j.info("Alias: "+s);
+ }
+ } else {
+ Utilities.setMapWork(jconf, mapWork);
+ }
+ }
+
@Override
void init(JobConf jconf, ProcessorContext processorContext, MRTaskReporter mrReporter,
Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
@@ -87,22 +104,7 @@ public class MapRecordProcessor extends
((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
}
- ObjectCache cache = ObjectCacheFactory.getCache(jconf);
try {
-
- execContext.setJc(jconf);
- // create map and fetch operators
- mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
- if (mapWork == null) {
- mapWork = Utilities.getMapWork(jconf);
- cache.cache(MAP_PLAN_KEY, mapWork);
- l4j.info("Plan: "+mapWork);
- for (String s: mapWork.getAliases()) {
- l4j.info("Alias: "+s);
- }
- } else {
- Utilities.setMapWork(jconf, mapWork);
- }
if (mapWork.getVectorMode()) {
mapOp = new VectorMapOperator();
} else {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1619739&r1=1619738&r2=1619739&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Fri Aug 22 11:04:41 2014
@@ -130,7 +130,7 @@ public class TezProcessor extends Abstra
LOG.info("Running task: " + getContext().getUniqueIdentifier());
if (isMap) {
- rproc = new MapRecordProcessor();
+ rproc = new MapRecordProcessor(jobConf);
MRInputLegacy mrInput = getMRInput(inputs);
try {
mrInput.init();
@@ -201,6 +201,7 @@ public class TezProcessor extends Abstra
this.writer = (KeyValueWriter) output.getWriter();
}
+ @Override
public void collect(Object key, Object value) throws IOException {
writer.write(key, value);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1619739&r1=1619738&r2=1619739&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Fri Aug 22 11:04:41 2014
@@ -134,7 +134,7 @@ public class TezTask extends Task<TezWor
}
List<LocalResource> additionalLr = session.getLocalizedResources();
-
+
// log which resources we're adding (apart from the hive exec)
if (LOG.isDebugEnabled()) {
if (additionalLr == null || additionalLr.size() == 0) {
@@ -165,7 +165,7 @@ public class TezTask extends Task<TezWor
counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
TezSessionPoolManager.getInstance().returnSession(session);
- if (LOG.isInfoEnabled()) {
+ if (LOG.isInfoEnabled() && counters != null) {
for (CounterGroup group: counters) {
LOG.info(group.getDisplayName() +":");
for (TezCounter counter: group) {