You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2014/10/21 08:48:11 UTC
svn commit: r1633283 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ data/conf/
itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/
itests/hive-unit/src/test/java/org/apache/hive/beeline/
jdbc/src/java/org/apache/hive/jdbc/ ql/src...
Author: navis
Date: Tue Oct 21 06:48:11 2014
New Revision: 1633283
URL: http://svn.apache.org/r1633283
Log:
HIVE-8186 : Self join may fail if one side has virtual column(s) and other doesn't (Navis reviewed by Sergey Shelukhin)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/data/conf/hive-log4j.properties
hive/trunk/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java
hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
hive/trunk/ql/src/test/queries/clientpositive/join_vc.q
hive/trunk/ql/src/test/results/clientpositive/join_vc.q.out
hive/trunk/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Oct 21 06:48:11 2014
@@ -598,8 +598,6 @@ public class HiveConf extends Configurat
new TimeValidator(TimeUnit.SECONDS),
"How long to run autoprogressor for the script/UDTF operators.\n" +
"Set to 0 for forever."),
- HIVETABLENAME("hive.table.name", "", ""),
- HIVEPARTITIONNAME("hive.partition.name", "", ""),
HIVESCRIPTAUTOPROGRESS("hive.script.auto.progress", false,
"Whether Hive Transform/Map/Reduce Clause should automatically send progress information to TaskTracker \n" +
"to avoid the task getting killed because of inactivity. Hive sends progress information when the script is \n" +
Modified: hive/trunk/data/conf/hive-log4j.properties
URL: http://svn.apache.org/viewvc/hive/trunk/data/conf/hive-log4j.properties?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/data/conf/hive-log4j.properties (original)
+++ hive/trunk/data/conf/hive-log4j.properties Tue Oct 21 06:48:11 2014
@@ -75,6 +75,11 @@ log4j.category.JPOX.Query=ERROR,DRFA
log4j.category.JPOX.General=ERROR,DRFA
log4j.category.JPOX.Enhancer=ERROR,DRFA
log4j.logger.org.apache.hadoop.conf.Configuration=ERROR,DRFA
+log4j.logger.org.apache.zookeeper=INFO,DRFA
+log4j.logger.org.apache.zookeeper.server.ServerCnxn=WARN,DRFA
log4j.logger.org.apache.zookeeper.server.NIOServerCnxn=WARN,DRFA
+log4j.logger.org.apache.zookeeper.ClientCnxn=WARN,DRFA
+log4j.logger.org.apache.zookeeper.ClientCnxnSocket=WARN,DRFA
log4j.logger.org.apache.zookeeper.ClientCnxnSocketNIO=WARN,DRFA
-
+log4j.logger.org.apache.hadoop.hive.ql.log.PerfLogger=WARN,DRFA
+log4j.logger.org.apache.hadoop.hive.ql.exec.Operator=INFO,DRFA
Modified: hive/trunk/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java (original)
+++ hive/trunk/itests/hive-minikdc/src/test/java/org/apache/hive/minikdc/TestJdbcWithMiniKdc.java Tue Oct 21 06:48:11 2014
@@ -19,7 +19,6 @@
package org.apache.hive.minikdc;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -153,7 +152,7 @@ public class TestJdbcWithMiniKdc {
}
/***
- * Negtive test for token based authentication
+ * Negative test for token based authentication
* Verify that a user can't retrieve a token for user that
* it's not allowed to impersonate
* @throws Exception
@@ -163,13 +162,20 @@ public class TestJdbcWithMiniKdc {
miniHiveKdc.loginUser(MiniHiveKdc.HIVE_TEST_SUPER_USER);
hs2Conn = DriverManager.getConnection(miniHS2.getJdbcURL());
- // retrieve token and store in the cache
- String token = ((HiveConnection)hs2Conn).getDelegationToken(
- MiniHiveKdc.HIVE_TEST_USER_2, MiniHiveKdc.HIVE_SERVICE_PRINCIPAL);
- hs2Conn.close();
+ try {
+ // retrieve token and store in the cache
+ String token = ((HiveConnection)hs2Conn).getDelegationToken(
+ MiniHiveKdc.HIVE_TEST_USER_2, MiniHiveKdc.HIVE_SERVICE_PRINCIPAL);
- assertNull(MiniHiveKdc.HIVE_TEST_SUPER_USER + " shouldn't be allowed to create token for " +
- MiniHiveKdc.HIVE_TEST_USER_2, token);
+ fail(MiniHiveKdc.HIVE_TEST_SUPER_USER + " shouldn't be allowed to retrieve token for " +
+ MiniHiveKdc.HIVE_TEST_USER_2);
+ } catch (SQLException e) {
+ // Expected error
+ assertTrue(e.getMessage().contains("Failed to validate proxy privilege"));
+ assertTrue(e.getCause().getCause().getMessage().contains("Failed to validate proxy privilege"));
+ } finally {
+ hs2Conn.close();
+ }
}
/**
@@ -201,7 +207,9 @@ public class TestJdbcWithMiniKdc {
+ MiniHiveKdc.HIVE_TEST_USER_2);
} catch (SQLException e) {
// Expected error
- assertEquals("08S01", e.getSQLState().trim());
+ e.printStackTrace();
+ assertTrue(e.getMessage().contains("Failed to validate proxy privilege"));
+ assertTrue(e.getCause().getCause().getMessage().contains("is not allowed to impersonate"));
}
}
Modified: hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java (original)
+++ hive/trunk/itests/hive-unit/src/test/java/org/apache/hive/beeline/TestBeeLineWithArgs.java Tue Oct 21 06:48:11 2014
@@ -164,7 +164,7 @@ public class TestBeeLineWithArgs {
* Test for presence of an expected pattern
* in the output (stdout or stderr), fail if not found
* Print PASSED or FAILED
- * @paramm testName Name of test to print
+ * @param testName Name of test to print
* @param expectedPattern Text to look for in command output/error
* @param shouldMatch true if the pattern should be found, false if it should not
* @throws Exception on command execution error
@@ -260,9 +260,9 @@ public class TestBeeLineWithArgs {
public void testBeelineHiveConfVariable() throws Throwable {
List<String> argList = getBaseArgs(JDBC_URL);
argList.add("--hiveconf");
- argList.add("hive.table.name=dummy");
+ argList.add("test.hive.table.name=dummy");
final String TEST_NAME = "testBeelineHiveConfVariable";
- final String SCRIPT_TEXT = "create table ${hiveconf:hive.table.name} (d int);\nshow tables;\n";
+ final String SCRIPT_TEXT = "create table ${hiveconf:test.hive.table.name} (d int);\nshow tables;\n";
final String EXPECTED_PATTERN = "dummy";
testScriptFile(TEST_NAME, SCRIPT_TEXT, EXPECTED_PATTERN, true, argList);
}
Modified: hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java
URL: http://svn.apache.org/viewvc/hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java (original)
+++ hive/trunk/jdbc/src/java/org/apache/hive/jdbc/Utils.java Tue Oct 21 06:48:11 2014
@@ -90,7 +90,7 @@ public class Utils {
static final String HTTP_PATH_DEPRECATED = "hive.server2.thrift.http.path";
static final String HTTP_PATH = "httpPath";
static final String SERVICE_DISCOVERY_MODE = "serviceDiscoveryMode";
- // Don't use dynamic serice discovery
+ // Don't use dynamic service discovery
static final String SERVICE_DISCOVERY_MODE_NONE = "none";
// Use ZooKeeper for indirection while using dynamic service discovery
static final String SERVICE_DISCOVERY_MODE_ZOOKEEPER = "zooKeeper";
@@ -226,10 +226,11 @@ public class Utils {
// Verify success and optionally with_info status, else throw SQLException
public static void verifySuccess(TStatus status, boolean withInfo) throws SQLException {
- if ((status.getStatusCode() != TStatusCode.SUCCESS_STATUS) &&
- (withInfo && (status.getStatusCode() != TStatusCode.SUCCESS_WITH_INFO_STATUS))) {
- throw new HiveSQLException(status);
+ if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS ||
+ (withInfo && status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS)) {
+ return;
}
+ throw new HiveSQLException(status);
}
/**
@@ -392,8 +393,9 @@ public class Utils {
* Also log a deprecation message for the client.
* @param fromMap
* @param toMap
- * @param oldName
+ * @param deprecatedName
* @param newName
+ * @param newUsage
*/
private static void handleParamDeprecation(Map<String, String> fromMap, Map<String, String> toMap,
String deprecatedName, String newName, String newUsage) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Tue Oct 21 06:48:11 2014
@@ -23,20 +23,20 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -59,6 +59,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
/**
@@ -83,58 +84,24 @@ public class MapOperator extends Operato
private final transient LongWritable recordCounter = new LongWritable();
protected transient long numRows = 0;
protected transient long cntr = 1;
- protected final boolean isInfoEnabled = LOG.isInfoEnabled();
- protected final boolean isDebugEnabled = LOG.isDebugEnabled();
- private final Map<MapInputPath, MapOpCtx> opCtxMap = new HashMap<MapInputPath, MapOpCtx>();
- private final Map<Operator<? extends OperatorDesc>, MapOpCtx> childrenOpToOpCtxMap =
- new HashMap<Operator<? extends OperatorDesc>, MapOpCtx>();
-
- protected transient MapOpCtx current;
- private transient List<Operator<? extends OperatorDesc>> extraChildrenToClose = null;
- private final Map<String, Path> normalizedPaths = new HashMap<String, Path>();
-
- private static class MapInputPath {
- String path;
- String alias;
- Operator<?> op;
- PartitionDesc partDesc;
-
- /**
- * @param path
- * @param alias
- * @param op
- */
- public MapInputPath(String path, String alias, Operator<?> op, PartitionDesc partDesc) {
- this.path = path;
- this.alias = alias;
- this.op = op;
- this.partDesc = partDesc;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof MapInputPath) {
- MapInputPath mObj = (MapInputPath) o;
- return path.equals(mObj.path) && alias.equals(mObj.alias)
- && op.equals(mObj.op);
- }
-
- return false;
- }
-
- @Override
- public int hashCode() {
- int ret = (path == null) ? 0 : path.hashCode();
- ret += (alias == null) ? 0 : alias.hashCode();
- ret += (op == null) ? 0 : op.hashCode();
- return ret;
- }
- }
+ // input path --> {operator --> context}
+ private final Map<String, Map<Operator<?>, MapOpCtx>> opCtxMap =
+ new HashMap<String, Map<Operator<?>, MapOpCtx>>();
+ // child operator --> object inspector (converted OI if it's needed)
+ private final Map<Operator<?>, StructObjectInspector> childrenOpToOI =
+ new HashMap<Operator<?>, StructObjectInspector>();
+
+ // context for current input file
+ protected transient MapOpCtx[] currentCtxs;
+ private transient final Map<String, Path> normalizedPaths = new HashMap<String, Path>();
protected static class MapOpCtx {
- StructObjectInspector tblRawRowObjectInspector; // columns
+ final String alias;
+ final Operator<?> op;
+ final PartitionDesc partDesc;
+
StructObjectInspector partObjectInspector; // partition columns
StructObjectInspector vcsObjectInspector; // virtual columns
StructObjectInspector rowObjectInspector;
@@ -150,6 +117,12 @@ public class MapOperator extends Operato
List<VirtualColumn> vcs;
Object[] vcValues;
+ public MapOpCtx(String alias, Operator<?> op, PartitionDesc partDesc) {
+ this.alias = alias;
+ this.op = op;
+ this.partDesc = partDesc;
+ }
+
private boolean isPartitioned() {
return partObjectInspector != null;
}
@@ -158,12 +131,30 @@ public class MapOperator extends Operato
return vcsObjectInspector != null;
}
- private Object readRow(Writable value) throws SerDeException {
- return partTblObjectInspectorConverter.convert(deserializer.deserialize(value));
+ private Object readRow(Writable value, ExecMapperContext context) throws SerDeException {
+ Object deserialized = deserializer.deserialize(value);
+ Object row = partTblObjectInspectorConverter.convert(deserialized);
+ if (hasVC()) {
+ rowWithPartAndVC[0] = row;
+ if (context != null) {
+ populateVirtualColumnValues(context, vcs, vcValues, deserializer);
+ }
+ int vcPos = isPartitioned() ? 2 : 1;
+ rowWithPartAndVC[vcPos] = vcValues;
+ return rowWithPartAndVC;
+ } else if (isPartitioned()) {
+ rowWithPart[0] = row;
+ return rowWithPart;
+ }
+ return row;
}
- public StructObjectInspector getRowObjectInspector() {
- return rowObjectInspector;
+ public boolean forward(Object row) throws HiveException {
+ if (op.getDone()) {
+ return false;
+ }
+ op.processOp(row, 0);
+ return true;
}
}
@@ -176,20 +167,20 @@ public class MapOperator extends Operato
* @param mapWork
* @throws HiveException
*/
- public void initializeAsRoot(Configuration hconf, MapWork mapWork)
- throws HiveException {
+ @VisibleForTesting
+ void initializeAsRoot(JobConf hconf, MapWork mapWork) throws Exception {
setConf(mapWork);
setChildren(hconf);
+ setExecContext(new ExecMapperContext(hconf));
initialize(hconf, null);
}
- private MapOpCtx initObjectInspector(Configuration hconf, MapInputPath ctx,
- Map<TableDesc, StructObjectInspector> convertedOI) throws Exception {
+ private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx opCtx,
+ StructObjectInspector tableRowOI) throws Exception {
- PartitionDesc pd = ctx.partDesc;
+ PartitionDesc pd = opCtx.partDesc;
TableDesc td = pd.getTableDesc();
- MapOpCtx opCtx = new MapOpCtx();
// Use table properties in case of unpartitioned tables,
// and the union of table properties and partition properties, with partition
// taking precedence, in the case of partitioned tables
@@ -200,18 +191,13 @@ public class MapOperator extends Operato
opCtx.tableName = String.valueOf(overlayedProps.getProperty("name"));
opCtx.partName = String.valueOf(partSpec);
-
- Class serdeclass = hconf.getClassByName(pd.getSerdeClassName());
- opCtx.deserializer = (Deserializer) serdeclass.newInstance();
- SerDeUtils.initializeSerDe(opCtx.deserializer, hconf, td.getProperties(), pd.getProperties());
+ opCtx.deserializer = pd.getDeserializer(hconf);
StructObjectInspector partRawRowObjectInspector =
(StructObjectInspector) opCtx.deserializer.getObjectInspector();
- opCtx.tblRawRowObjectInspector = convertedOI.get(td);
-
- opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
- partRawRowObjectInspector, opCtx.tblRawRowObjectInspector);
+ opCtx.partTblObjectInspectorConverter =
+ ObjectInspectorConverters.getConverter(partRawRowObjectInspector, tableRowOI);
// Next check if this table has partitions and if so
// get the list of partition names as well as allocate
@@ -259,8 +245,8 @@ public class MapOperator extends Operato
// The op may not be a TableScan for mapjoins
// Consider the query: select /*+MAPJOIN(a)*/ count(*) FROM T1 a JOIN T2 b ON a.key = b.key;
// In that case, it will be a Select, but the rowOI need not be amended
- if (ctx.op instanceof TableScanOperator) {
- TableScanOperator tsOp = (TableScanOperator) ctx.op;
+ if (opCtx.op instanceof TableScanOperator) {
+ TableScanOperator tsOp = (TableScanOperator) opCtx.op;
TableScanDesc tsDesc = tsOp.getConf();
if (tsDesc != null && tsDesc.hasVirtualCols()) {
opCtx.vcs = tsDesc.getVirtualCols();
@@ -274,11 +260,11 @@ public class MapOperator extends Operato
}
}
if (!opCtx.hasVC() && !opCtx.isPartitioned()) {
- opCtx.rowObjectInspector = opCtx.tblRawRowObjectInspector;
+ opCtx.rowObjectInspector = tableRowOI;
return opCtx;
}
List<StructObjectInspector> inspectors = new ArrayList<StructObjectInspector>();
- inspectors.add(opCtx.tblRawRowObjectInspector);
+ inspectors.add(tableRowOI);
if (opCtx.isPartitioned()) {
inspectors.add(opCtx.partObjectInspector);
}
@@ -308,19 +294,14 @@ public class MapOperator extends Operato
for (String onefile : conf.getPathToAliases().keySet()) {
PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
TableDesc tableDesc = pd.getTableDesc();
- Properties tblProps = tableDesc.getProperties();
- Class sdclass = hconf.getClassByName(pd.getSerdeClassName());
- Deserializer partDeserializer = (Deserializer) sdclass.newInstance();
- SerDeUtils.initializeSerDe(partDeserializer, hconf, tblProps, pd.getProperties());
- StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer
- .getObjectInspector();
+ Deserializer partDeserializer = pd.getDeserializer(hconf);
+ StructObjectInspector partRawRowObjectInspector =
+ (StructObjectInspector) partDeserializer.getObjectInspector();
StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc);
if ((tblRawRowObjectInspector == null) ||
(identityConverterTableDesc.contains(tableDesc))) {
- sdclass = hconf.getClassByName(tableDesc.getSerdeClassName());
- Deserializer tblDeserializer = (Deserializer) sdclass.newInstance();
- SerDeUtils.initializeSerDe(tblDeserializer, hconf, tblProps, null);
+ Deserializer tblDeserializer = tableDesc.getDeserializer(hconf);
tblRawRowObjectInspector =
(StructObjectInspector) ObjectInspectorConverters.getConvertedOI(
partRawRowObjectInspector,
@@ -344,70 +325,85 @@ public class MapOperator extends Operato
return tableDescOI;
}
- public void setChildren(Configuration hconf) throws HiveException {
- Path fpath = IOContext.get(hconf.get(Utilities.INPUT_NAME)).getInputPath();
-
- boolean schemeless = fpath.toUri().getScheme() == null;
+ public void setChildren(Configuration hconf) throws Exception {
List<Operator<? extends OperatorDesc>> children =
new ArrayList<Operator<? extends OperatorDesc>>();
Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(hconf);
- try {
- for (Map.Entry<String, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) {
- String onefile = entry.getKey();
- List<String> aliases = entry.getValue();
-
- Path onepath = new Path(onefile);
- if (schemeless) {
- onepath = new Path(onepath.toUri().getPath());
- }
-
- PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
-
- for (String onealias : aliases) {
- Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(onealias);
- if (isDebugEnabled) {
- LOG.debug("Adding alias " + onealias + " to work list for file "
- + onefile);
- }
- MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc);
- if (opCtxMap.containsKey(inp)) {
- continue;
- }
- MapOpCtx opCtx = initObjectInspector(hconf, inp, convertedOI);
- opCtxMap.put(inp, opCtx);
+ for (Map.Entry<String, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) {
+ String onefile = entry.getKey();
+ List<String> aliases = entry.getValue();
+ PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
- op.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
- op.getParentOperators().add(this);
- // check for the operators who will process rows coming to this Map
- // Operator
- if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
- children.add(op);
- childrenOpToOpCtxMap.put(op, opCtx);
- if (isInfoEnabled) {
- LOG.info("dump " + op + " "
- + opCtxMap.get(inp).rowObjectInspector.getTypeName());
- }
- }
- current = opCtx; // just need for TestOperators.testMapOperator
+ for (String alias : aliases) {
+ Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(alias);
+ if (isLogDebugEnabled) {
+ LOG.debug("Adding alias " + alias + " to work list for file "
+ + onefile);
+ }
+ Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(onefile);
+ if (contexts == null) {
+ opCtxMap.put(onefile, contexts = new LinkedHashMap<Operator<?>, MapOpCtx>());
}
+ if (contexts.containsKey(op)) {
+ continue;
+ }
+ MapOpCtx context = new MapOpCtx(alias, op, partDesc);
+ StructObjectInspector tableRowOI = convertedOI.get(partDesc.getTableDesc());
+ contexts.put(op, initObjectInspector(hconf, context, tableRowOI));
+
+ op.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>(1));
+ op.getParentOperators().add(this);
+ children.add(op);
}
+ }
+
+ initOperatorContext(children);
- if (children.size() == 0) {
- // didn't find match for input file path in configuration!
- // serious problem ..
- LOG.error("Configuration does not have any alias for path: "
- + fpath.toUri());
- throw new HiveException("Configuration and input path are inconsistent");
+ // we found all the operators that we are supposed to process.
+ setChildOperators(children);
+ }
+
+ private void initOperatorContext(List<Operator<? extends OperatorDesc>> children)
+ throws HiveException {
+ for (Map<Operator<?>, MapOpCtx> contexts : opCtxMap.values()) {
+ for (MapOpCtx context : contexts.values()) {
+ if (!children.contains(context.op)) {
+ continue;
+ }
+ StructObjectInspector prev =
+ childrenOpToOI.put(context.op, context.rowObjectInspector);
+ if (prev != null && !prev.equals(context.rowObjectInspector)) {
+ throw new HiveException("Conflict on row inspector for " + context.alias);
+ }
+ if (isLogInfoEnabled) {
+ LOG.info("dump " + context.op + " " + context.rowObjectInspector.getTypeName());
+ }
}
+ }
+ }
- // we found all the operators that we are supposed to process.
- setChildOperators(children);
- } catch (Exception e) {
- throw new HiveException(e);
+ private String getNominalPath(Path fpath) {
+ String nominal = null;
+ boolean schemaless = fpath.toUri().getScheme() == null;
+ for (String onefile : conf.getPathToAliases().keySet()) {
+ Path onepath = normalizePath(onefile, schemaless);
+ // check for the operators who will process rows coming to this Map Operator
+ if (onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
+ // not from this
+ continue;
+ }
+ if (nominal != null) {
+ throw new IllegalStateException("Ambiguous input path " + fpath);
+ }
+ nominal = onefile;
+ }
+ if (nominal == null) {
+ throw new IllegalStateException("Invalid input path " + fpath);
}
+ return nominal;
}
@Override
@@ -422,88 +418,56 @@ public class MapOperator extends Operato
}
statsMap.put(Counter.RECORDS_IN + context, recordCounter);
- List<Operator<? extends OperatorDesc>> children = getChildOperators();
-
- for (Entry<Operator<? extends OperatorDesc>, MapOpCtx> entry : childrenOpToOpCtxMap
- .entrySet()) {
- Operator<? extends OperatorDesc> child = entry.getKey();
- MapOpCtx mapOpCtx = entry.getValue();
- // Add alias, table name, and partitions to hadoop conf so that their
- // children will inherit these
- HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, mapOpCtx.tableName);
- HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, mapOpCtx.partName);
- child.initialize(hconf, new ObjectInspector[] {mapOpCtx.rowObjectInspector});
- }
-
- for (Entry<MapInputPath, MapOpCtx> entry : opCtxMap.entrySet()) {
- MapInputPath input = entry.getKey();
- MapOpCtx mapOpCtx = entry.getValue();
- // Add alias, table name, and partitions to hadoop conf so that their
- // children will inherit these
- HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, mapOpCtx.tableName);
- HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, mapOpCtx.partName);
-
- Operator<? extends OperatorDesc> op = input.op;
- if (children.indexOf(op) == -1) {
- // op is not in the children list, so need to remember it and close it afterwards
- if (extraChildrenToClose == null) {
- extraChildrenToClose = new ArrayList<Operator<? extends OperatorDesc>>();
- }
- extraChildrenToClose.add(op);
- op.initialize(hconf, new ObjectInspector[] {entry.getValue().rowObjectInspector});
- }
+ for (Entry<Operator<?>, StructObjectInspector> entry : childrenOpToOI.entrySet()) {
+ Operator<?> child = entry.getKey();
+ child.initialize(hconf, new ObjectInspector[] {entry.getValue()});
}
}
- /**
- * close extra child operators that are initialized but are not executed.
- */
@Override
public void closeOp(boolean abort) throws HiveException {
- if (extraChildrenToClose != null) {
- for (Operator<? extends OperatorDesc> op : extraChildrenToClose) {
- op.close(abort);
- }
- }
recordCounter.set(numRows);
+ super.closeOp(abort);
}
// Find context for current input file
@Override
public void cleanUpInputFileChangedOp() throws HiveException {
+ super.cleanUpInputFileChangedOp();
Path fpath = getExecContext().getCurrentInputPath();
-
- for (String onefile : conf.getPathToAliases().keySet()) {
- Path onepath = normalizePath(onefile);
- // check for the operators who will process rows coming to this Map
- // Operator
- if (onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
- // not from this
- continue;
- }
- PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
- for (String onealias : conf.getPathToAliases().get(onefile)) {
- Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(onealias);
- MapInputPath inp = new MapInputPath(onefile, onealias, op, partDesc);
- MapOpCtx context = opCtxMap.get(inp);
- if (context != null) {
- current = context;
- if (isInfoEnabled) {
- LOG.info("Processing alias " + onealias + " for file " + onefile);
- }
- return;
+ String nominalPath = getNominalPath(fpath);
+ Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath);
+ if (isLogInfoEnabled) {
+ StringBuilder builder = new StringBuilder();
+ for (MapOpCtx context : contexts.values()) {
+ if (builder.length() > 0) {
+ builder.append(", ");
}
+ builder.append(context.alias);
}
+ if (isLogDebugEnabled) {
+ LOG.info("Processing alias(es) " + builder.toString() + " for file " + fpath);
+ }
+ }
+ // Add alias, table name, and partitions to hadoop conf so that their
+ // children will inherit these
+ for (Entry<Operator<?>, MapOpCtx> entry : contexts.entrySet()) {
+ Operator<?> operator = entry.getKey();
+ MapOpCtx context = entry.getValue();
+ operator.setInputContext(nominalPath, context.tableName, context.partName);
}
- throw new IllegalStateException("Invalid path " + fpath);
+ currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]);
}
- private Path normalizePath(String onefile) {
+ private Path normalizePath(String onefile, boolean schemaless) {
//creating Path is expensive, so cache the corresponding
//Path object in normalizedPaths
Path path = normalizedPaths.get(onefile);
- if(path == null){
+ if (path == null) {
path = new Path(onefile);
+ if (schemaless && path.toUri().getScheme() != null) {
+ path = new Path(path.toUri().getPath());
+ }
normalizedPaths.put(onefile, path);
}
return path;
@@ -517,57 +481,46 @@ public class MapOperator extends Operato
// The child operators cleanup if input file has changed
cleanUpInputFileChanged();
}
- Object row;
- try {
- row = current.readRow(value);
- if (current.hasVC()) {
- current.rowWithPartAndVC[0] = row;
- if (context != null) {
- populateVirtualColumnValues(context, current.vcs, current.vcValues, current.deserializer);
- }
- int vcPos = current.isPartitioned() ? 2 : 1;
- current.rowWithPartAndVC[vcPos] = current.vcValues;
- row = current.rowWithPartAndVC;
- } else if (current.isPartitioned()) {
- current.rowWithPart[0] = row;
- row = current.rowWithPart;
- }
- } catch (Exception e) {
- // Serialize the row and output.
- String rawRowString;
+ int childrenDone = 0;
+ for (MapOpCtx current : currentCtxs) {
+ Object row = null;
try {
- rawRowString = value.toString();
- } catch (Exception e2) {
- rawRowString = "[Error getting row data with exception " +
- StringUtils.stringifyException(e2) + " ]";
+ row = current.readRow(value, context);
+ if (!current.forward(row)) {
+ childrenDone++;
+ }
+ } catch (Exception e) {
+ // TODO: policy on deserialization errors
+ String message = toErrorMessage(value, row, current.rowObjectInspector);
+ if (row == null) {
+ deserialize_error_count.set(deserialize_error_count.get() + 1);
+ throw new HiveException("Hive Runtime Error while processing writable " + message, e);
+ }
+ throw new HiveException("Hive Runtime Error while processing row " + message, e);
}
+ }
+ rowForwarded(childrenDone);
+ }
- // TODO: policy on deserialization errors
- deserialize_error_count.set(deserialize_error_count.get() + 1);
- throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e);
+ protected final void rowForwarded(int childrenDone) {
+ numRows++;
+ if (isLogInfoEnabled && numRows == cntr) {
+ cntr *= 10;
+ LOG.info(toString() + ": records read - " + numRows);
}
+ if (childrenDone == currentCtxs.length) {
+ setDone(true);
+ }
+ }
- // The row has been converted to comply with table schema, irrespective of partition schema.
- // So, use tblOI (and not partOI) for forwarding
+ private String toErrorMessage(Writable value, Object row, ObjectInspector inspector) {
try {
- numRows++;
- if (isInfoEnabled) {
- if (numRows == cntr) {
- cntr *= 10;
- LOG.info(toString() + ": records read - " + numRows);
- }
+ if (row != null) {
+ return SerDeUtils.getJSONString(row, inspector);
}
- forward(row, current.rowObjectInspector);
+ return String.valueOf(value);
} catch (Exception e) {
- // Serialize the row and output the error message.
- String rowString;
- try {
- rowString = SerDeUtils.getJSONString(row, current.rowObjectInspector);
- } catch (Exception e2) {
- rowString = "[Error getting row data with exception " +
- StringUtils.stringifyException(e2) + " ]";
- }
- throw new HiveException("Hive Runtime Error while processing row " + rowString, e);
+ return "[Error getting row data with exception " + StringUtils.stringifyException(e) + " ]";
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Oct 21 06:48:11 2014
@@ -214,8 +214,11 @@ public abstract class Operator<T extends
protected transient Map<String, LongWritable> statsMap = new HashMap<String, LongWritable>();
@SuppressWarnings("rawtypes")
protected transient OutputCollector out;
- protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
- protected transient boolean isLogInfoEnabled = LOG.isInfoEnabled();
+ protected transient final Log LOG = LogFactory.getLog(getClass().getName());
+ protected transient final Log PLOG = LogFactory.getLog(Operator.class.getName()); // for simple disabling logs from all operators
+ protected transient final boolean isLogInfoEnabled = LOG.isInfoEnabled() && PLOG.isInfoEnabled();
+ protected transient final boolean isLogDebugEnabled = LOG.isDebugEnabled() && PLOG.isDebugEnabled();
+ protected transient final boolean isLogTraceEnabled = LOG.isTraceEnabled() && PLOG.isTraceEnabled();
protected transient String alias;
protected transient Reporter reporter;
protected transient String id;
@@ -491,33 +494,45 @@ public abstract class Operator<T extends
public abstract void processOp(Object row, int tag) throws HiveException;
protected final void defaultStartGroup() throws HiveException {
- LOG.debug("Starting group");
+ if (isLogDebugEnabled) {
+ LOG.debug("Starting group");
+ }
if (childOperators == null) {
return;
}
- LOG.debug("Starting group for children:");
+ if (isLogDebugEnabled) {
+ LOG.debug("Starting group for children:");
+ }
for (Operator<? extends OperatorDesc> op : childOperators) {
op.startGroup();
}
- LOG.debug("Start group Done");
+ if (isLogDebugEnabled) {
+ LOG.debug("Start group Done");
+ }
}
protected final void defaultEndGroup() throws HiveException {
- LOG.debug("Ending group");
+ if (isLogDebugEnabled) {
+ LOG.debug("Ending group");
+ }
if (childOperators == null) {
return;
}
- LOG.debug("Ending group for children:");
+ if (isLogDebugEnabled) {
+ LOG.debug("Ending group for children:");
+ }
for (Operator<? extends OperatorDesc> op : childOperators) {
op.endGroup();
}
- LOG.debug("End group Done");
+ if (isLogDebugEnabled) {
+ LOG.debug("End group Done");
+ }
}
// If a operator wants to do some work at the beginning of a group
@@ -1047,6 +1062,17 @@ public abstract class Operator<T extends
public void cleanUpInputFileChangedOp() throws HiveException {
}
+ // called by map operator. propagated recursively to single parented descendants
+ public void setInputContext(String inputPath, String tableName, String partitionName) {
+ if (childOperators != null) {
+ for (Operator<? extends OperatorDesc> child : childOperators) {
+ if (child.getNumParent() == 1) {
+ child.setInputContext(inputPath, tableName, partitionName);
+ }
+ }
+ }
+ }
+
public boolean supportSkewJoinOptimization() {
return false;
}
@@ -1264,7 +1290,7 @@ public abstract class Operator<T extends
}
public void setOpTraits(OpTraits metaInfo) {
- if (LOG.isDebugEnabled()) {
+ if (isLogDebugEnabled) {
LOG.debug("Setting traits ("+metaInfo+") on "+this);
}
if (conf != null) {
@@ -1275,7 +1301,7 @@ public abstract class Operator<T extends
}
public void setStatistics(Statistics stats) {
- if (LOG.isDebugEnabled()) {
+ if (isLogDebugEnabled) {
LOG.debug("Setting stats ("+stats+") on "+this);
}
if (conf != null) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Oct 21 06:48:11 2014
@@ -25,8 +25,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Random;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -75,10 +73,6 @@ public class ReduceSinkOperator extends
RECORDS_OUT_INTERMEDIATE
}
- private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName());
- private static final boolean isInfoEnabled = LOG.isInfoEnabled();
- private static final boolean isDebugEnabled = LOG.isDebugEnabled();
- private static final boolean isTraceEnabled = LOG.isTraceEnabled();
private static final long serialVersionUID = 1L;
private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance();
@@ -169,7 +163,7 @@ public class ReduceSinkOperator extends
List<ExprNodeDesc> keys = conf.getKeyCols();
- if (isDebugEnabled) {
+ if (isLogDebugEnabled) {
LOG.debug("keys size is " + keys.size());
for (ExprNodeDesc k : keys) {
LOG.debug("Key exprNodeDesc " + k.getExprString());
@@ -214,7 +208,7 @@ public class ReduceSinkOperator extends
tag = conf.getTag();
tagByte[0] = (byte) tag;
skipTag = conf.getSkipTag();
- if (isInfoEnabled) {
+ if (isLogInfoEnabled) {
LOG.info("Using tag = " + tag);
}
@@ -316,7 +310,7 @@ public class ReduceSinkOperator extends
bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
}
- if (isInfoEnabled) {
+ if (isLogInfoEnabled) {
LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " +
conf.getNumDistributionKeys());
}
@@ -362,7 +356,7 @@ public class ReduceSinkOperator extends
if (useUniformHash && partitionEval.length > 0) {
hashCode = computeMurmurHash(firstKey);
} else {
- hashCode = computeHashCode(row);
+ hashCode = computeHashCode(row, bucketNumber);
}
firstKey.setHashCode(hashCode);
@@ -411,7 +405,7 @@ public class ReduceSinkOperator extends
// column directly.
Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
- if (isTraceEnabled) {
+ if (isLogTraceEnabled) {
LOG.trace("Acid choosing bucket number " + buckNum);
}
} else {
@@ -458,7 +452,7 @@ public class ReduceSinkOperator extends
return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0);
}
- private int computeHashCode(Object row) throws HiveException {
+ private int computeHashCode(Object row, int buckNum) throws HiveException {
// Evaluate the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
@@ -482,10 +476,11 @@ public class ReduceSinkOperator extends
+ ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
}
}
- if (isTraceEnabled) {
- LOG.trace("Going to return hash code " + (keyHashCode * 31 + bucketNumber));
+ int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
+ if (isLogTraceEnabled) {
+ LOG.trace("Going to return hash code " + hashCode);
}
- return bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber;
+ return hashCode;
}
private boolean partitionKeysAreNull(Object row) throws HiveException {
@@ -527,7 +522,7 @@ public class ReduceSinkOperator extends
// forward is not called
if (null != out) {
numRows++;
- if (isInfoEnabled) {
+ if (isLogInfoEnabled) {
if (numRows == cntr) {
cntr *= 10;
LOG.info(toString() + ": records written - " + numRows);
@@ -562,7 +557,7 @@ public class ReduceSinkOperator extends
}
super.closeOp(abort);
out = null;
- if (isInfoEnabled) {
+ if (isLogInfoEnabled) {
LOG.info(toString() + ": records written - " + numRows);
}
recordCounter.set(numRows);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Tue Oct 21 06:48:11 2014
@@ -283,6 +283,16 @@ public class ScriptOperator extends Oper
return;
}
+ private transient String tableName;
+ private transient String partitionName ;
+
+ @Override
+ public void setInputContext(String inputPath, String tableName, String partitionName) {
+ this.tableName = tableName;
+ this.partitionName = partitionName;
+ super.setInputContext(inputPath, tableName, partitionName);
+ }
+
@Override
public void processOp(Object row, int tag) throws HiveException {
// initialize the user's process only when you receive the first row
@@ -306,10 +316,8 @@ public class ScriptOperator extends Oper
String[] wrappedCmdArgs = addWrapper(cmdArgs);
LOG.info("Executing " + Arrays.asList(wrappedCmdArgs));
- LOG.info("tablename="
- + hconf.get(HiveConf.ConfVars.HIVETABLENAME.varname));
- LOG.info("partname="
- + hconf.get(HiveConf.ConfVars.HIVEPARTITIONNAME.varname));
+ LOG.info("tablename=" + tableName);
+ LOG.info("partname=" + partitionName);
LOG.info("alias=" + alias);
ProcessBuilder pb = new ProcessBuilder(wrappedCmdArgs);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Oct 21 06:48:11 2014
@@ -2042,15 +2042,21 @@ public final class Utilities {
public static ClassLoader getSessionSpecifiedClassLoader() {
SessionState state = SessionState.get();
if (state == null || state.getConf() == null) {
- LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead");
+ }
return JavaUtils.getClassLoader();
}
ClassLoader sessionCL = state.getConf().getClassLoader();
- if (sessionCL != null){
- LOG.debug("Use session specified class loader");
+ if (sessionCL != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Use session specified class loader");
+ }
return sessionCL;
}
- LOG.debug("Session specified class loader not found, use thread based class loader");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Session specified class loader not found, use thread based class loader");
+ }
return JavaUtils.getClassLoader();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java Tue Oct 21 06:48:11 2014
@@ -40,14 +40,13 @@ public class VectorMapOperator extends M
// The row has been converted to comply with table schema, irrespective of partition schema.
// So, use tblOI (and not partOI) for forwarding
try {
- if (isInfoEnabled) {
- numRows += ((VectorizedRowBatch)value).size;
- while (numRows > cntr) {
- cntr *= 10;
- LOG.info(toString() + ": records read - " + numRows);
+ int childrenDone = 0;
+ for (MapOpCtx current : currentCtxs) {
+ if (!current.forward(value)) {
+ childrenDone++;
}
}
- forward(value, current.getRowObjectInspector());
+ rowForwarded(childrenDone);
} catch (Exception e) {
throw new HiveException("Hive Runtime Error while processing row ", e);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Oct 21 06:48:11 2014
@@ -1188,7 +1188,9 @@ public class OrcInputFormat implements
int bucket) throws IOException {
for(FileStatus stat: fs.listStatus(directory)) {
String name = stat.getPath().getName();
- if (Integer.parseInt(name.substring(0, name.indexOf('_'))) == bucket) {
+ String numberPart = name.substring(0, name.indexOf('_'));
+ if (org.apache.commons.lang3.StringUtils.isNumeric(numberPart) &&
+ Integer.parseInt(numberPart) == bucket) {
return stat.getPath();
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java Tue Oct 21 06:48:11 2014
@@ -24,7 +24,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
-import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -76,12 +76,16 @@ public class TableDesc implements Serial
return inputFileFormatClass;
}
+ public Deserializer getDeserializer() throws Exception {
+ return getDeserializer(null);
+ }
+
/**
* Return a deserializer object corresponding to the tableDesc.
*/
- public Deserializer getDeserializer() throws Exception {
+ public Deserializer getDeserializer(Configuration conf) throws Exception {
Deserializer de = getDeserializerClass().newInstance();
- SerDeUtils.initializeSerDe(de, null, properties, null);
+ SerDeUtils.initializeSerDe(de, conf, properties, null);
return de;
}
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java Tue Oct 21 06:48:11 2014
@@ -18,10 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.File;
import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -32,7 +29,6 @@ import java.util.Map;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Driver;
@@ -41,7 +37,6 @@ import org.apache.hadoop.hive.ql.parse.T
import org.apache.hadoop.hive.ql.plan.CollectDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -49,8 +44,6 @@ import org.apache.hadoop.hive.ql.plan.Pl
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.processors.CommandProcessor;
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
@@ -60,12 +53,9 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.junit.Test;
@@ -279,7 +269,7 @@ public class TestOperators extends TestC
try {
System.out.println("Testing Map Operator");
// initialize configuration
- Configuration hconf = new JobConf(TestOperators.class);
+ JobConf hconf = new JobConf(TestOperators.class);
HiveConf.setVar(hconf, HiveConf.ConfVars.HADOOPMAPFILENAME,
"hdfs:///testDir/testFile");
IOContext.get(hconf.get(Utilities.INPUT_NAME)).setInputPath(
Modified: hive/trunk/ql/src/test/queries/clientpositive/join_vc.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/join_vc.q?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/join_vc.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/join_vc.q Tue Oct 21 06:48:11 2014
@@ -3,3 +3,10 @@
explain select t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value from src t1 join src t2 on t1.key = t2.key join src t3 on t2.value = t3.value order by t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value limit 3;
select t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value from src t1 join src t2 on t1.key = t2.key join src t3 on t2.value = t3.value order by t3.BLOCK__OFFSET__INSIDE__FILE,t3.key,t3.value limit 3;
+
+explain
+select t2.BLOCK__OFFSET__INSIDE__FILE
+from src t1 join src t2 on t1.key = t2.key where t1.key < 100;
+
+select t2.BLOCK__OFFSET__INSIDE__FILE
+from src t1 join src t2 on t1.key = t2.key where t1.key < 100;
Modified: hive/trunk/ql/src/test/results/clientpositive/join_vc.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/join_vc.q.out?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/join_vc.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/join_vc.q.out Tue Oct 21 06:48:11 2014
@@ -137,3 +137,227 @@ POSTHOOK: Input: default@src
0 238 val_238
0 238 val_238
0 238 val_238
+PREHOOK: query: explain
+select t2.BLOCK__OFFSET__INSIDE__FILE
+from src t1 join src t2 on t1.key = t2.key where t1.key < 100
+PREHOOK: type: QUERY
+POSTHOOK: query: explain
+select t2.BLOCK__OFFSET__INSIDE__FILE
+from src t1 join src t2 on t1.key = t2.key where t1.key < 100
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-0 depends on stages: Stage-1
+
+STAGE PLANS:
+ Stage: Stage-1
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: t2
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (key is not null and (key < 100)) (type: boolean)
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: key (type: string)
+ sort order: +
+ Map-reduce partition columns: key (type: string)
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ value expressions: BLOCK__OFFSET__INSIDE__FILE (type: bigint)
+ TableScan
+ alias: t1
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (key is not null and (key < 100)) (type: boolean)
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ Reduce Output Operator
+ key expressions: key (type: string)
+ sort order: +
+ Map-reduce partition columns: key (type: string)
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ Reduce Operator Tree:
+ Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0
+ 1 {VALUE._col1}
+ outputColumnNames: _col7
+ Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col7 (type: bigint)
+ outputColumnNames: _col0
+ Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: select t2.BLOCK__OFFSET__INSIDE__FILE
+from src t1 join src t2 on t1.key = t2.key where t1.key < 100
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: select t2.BLOCK__OFFSET__INSIDE__FILE
+from src t1 join src t2 on t1.key = t2.key where t1.key < 100
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+2088
+2632
+968
+2088
+2632
+968
+2088
+2632
+968
+2846
+3170
+1720
+4362
+1720
+4362
+386
+2770
+386
+2770
+910
+5340
+5514
+5340
+5514
+2824
+4004
+1118
+4594
+1972
+4594
+1972
+2226
+5284
+2226
+5284
+34
+5616
+3494
+3592
+3192
+3138
+4012
+1238
+3138
+4012
+1238
+3138
+4012
+1238
+5626
+328
+5626
+328
+1218
+3388
+2030
+3298
+2030
+3298
+2330
+4068
+1198
+3060
+4540
+3864
+3060
+4540
+3864
+3060
+4540
+3864
+2308
+1462
+2308
+1462
+4186
+1440
+1024
+1906
+3128
+1906
+3128
+3516
+1592
+198
+1754
+5306
+1754
+5306
+3570
+3794
+4640
+4548
+3794
+4640
+4548
+3794
+4640
+4548
+2792
+1208
+2792
+1208
+3548
+3378
+3538
+3378
+3538
+2622
+3368
+1916
+4058
+396
+5070
+1674
+5070
+1674
+1872
+5606
+1872
+5606
+2612
+12
+2652
+5398
+2802
+5744
+4304
+2802
+5744
+4304
+2802
+5744
+4304
+1176
+3160
+2400
+3160
+2400
+2216
+5572
+5802
+5572
+5802
+92
+2458
+92
+2458
Modified: hive/trunk/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java?rev=1633283&r1=1633282&r2=1633283&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/auth/HiveAuthFactory.java Tue Oct 21 06:48:11 2014
@@ -234,54 +234,62 @@ public class HiveAuthFactory {
// retrieve delegation token for the given user
public String getDelegationToken(String owner, String renewer) throws HiveSQLException {
if (saslServer == null) {
- throw new HiveSQLException("Delegation token only supported over kerberos authentication");
+ throw new HiveSQLException(
+ "Delegation token only supported over kerberos authentication", "08S01");
}
try {
String tokenStr = saslServer.getDelegationTokenWithService(owner, renewer, HS2_CLIENT_TOKEN);
if (tokenStr == null || tokenStr.isEmpty()) {
- throw new HiveSQLException("Received empty retrieving delegation token for user " + owner);
+ throw new HiveSQLException(
+ "Received empty retrieving delegation token for user " + owner, "08S01");
}
return tokenStr;
} catch (IOException e) {
- throw new HiveSQLException("Error retrieving delegation token for user " + owner, e);
+ throw new HiveSQLException(
+ "Error retrieving delegation token for user " + owner, "08S01", e);
} catch (InterruptedException e) {
- throw new HiveSQLException("delegation token retrieval interrupted", e);
+ throw new HiveSQLException("delegation token retrieval interrupted", "08S01", e);
}
}
// cancel given delegation token
public void cancelDelegationToken(String delegationToken) throws HiveSQLException {
if (saslServer == null) {
- throw new HiveSQLException("Delegation token only supported over kerberos authentication");
+ throw new HiveSQLException(
+ "Delegation token only supported over kerberos authentication", "08S01");
}
try {
saslServer.cancelDelegationToken(delegationToken);
} catch (IOException e) {
- throw new HiveSQLException("Error canceling delegation token " + delegationToken, e);
+ throw new HiveSQLException(
+ "Error canceling delegation token " + delegationToken, "08S01", e);
}
}
public void renewDelegationToken(String delegationToken) throws HiveSQLException {
if (saslServer == null) {
- throw new HiveSQLException("Delegation token only supported over kerberos authentication");
+ throw new HiveSQLException(
+ "Delegation token only supported over kerberos authentication", "08S01");
}
try {
saslServer.renewDelegationToken(delegationToken);
} catch (IOException e) {
- throw new HiveSQLException("Error renewing delegation token " + delegationToken, e);
+ throw new HiveSQLException(
+ "Error renewing delegation token " + delegationToken, "08S01", e);
}
}
public String getUserFromToken(String delegationToken) throws HiveSQLException {
if (saslServer == null) {
- throw new HiveSQLException("Delegation token only supported over kerberos authentication");
+ throw new HiveSQLException(
+ "Delegation token only supported over kerberos authentication", "08S01");
}
try {
return saslServer.getUserFromToken(delegationToken);
} catch (IOException e) {
- throw new HiveSQLException("Error extracting user from delegation token " + delegationToken,
- e);
+ throw new HiveSQLException(
+ "Error extracting user from delegation token " + delegationToken, "08S01", e);
}
}
@@ -302,7 +310,7 @@ public class HiveAuthFactory {
}
} catch (IOException e) {
throw new HiveSQLException(
- "Failed to validate proxy privilege of " + realUser + " for " + proxyUser, e);
+ "Failed to validate proxy privilege of " + realUser + " for " + proxyUser, "08S01", e);
}
}