You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 03:21:29 UTC
svn commit: r1522098 [15/30] - in /hive/branches/vectorization: ./
beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/a...
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Thu Sep 12 01:21:10 2013
@@ -40,8 +40,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
@@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.history
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.Hook;
import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.hooks.HookUtils;
import org.apache.hadoop.hive.ql.hooks.PostExecute;
import org.apache.hadoop.hive.ql.hooks.PreExecute;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -82,8 +83,8 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatter;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.parse.ASTNode;
-import org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContextImpl;
import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
@@ -418,25 +419,28 @@ public class Driver implements CommandPr
ctx.setCmd(command);
ctx.setHDFSCleanup(true);
+ perfLogger.PerfLogBegin(LOG, PerfLogger.PARSE);
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(command, ctx);
tree = ParseUtils.findRootNonNullToken(tree);
+ perfLogger.PerfLogEnd(LOG, PerfLogger.PARSE);
+ perfLogger.PerfLogBegin(LOG, PerfLogger.ANALYZE);
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
- List<AbstractSemanticAnalyzerHook> saHooks =
+ List<HiveSemanticAnalyzerHook> saHooks =
getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
- AbstractSemanticAnalyzerHook.class);
+ HiveSemanticAnalyzerHook.class);
// Do semantic analysis and plan generation
if (saHooks != null) {
HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
hookCtx.setConf(conf);
- for (AbstractSemanticAnalyzerHook hook : saHooks) {
+ for (HiveSemanticAnalyzerHook hook : saHooks) {
tree = hook.preAnalyze(hookCtx, tree);
}
sem.analyze(tree, ctx);
hookCtx.update(sem);
- for (AbstractSemanticAnalyzerHook hook : saHooks) {
+ for (HiveSemanticAnalyzerHook hook : saHooks) {
hook.postAnalyze(hookCtx, sem.getRootTasks());
}
} else {
@@ -447,6 +451,7 @@ public class Driver implements CommandPr
// validate the plan
sem.validate();
+ perfLogger.PerfLogEnd(LOG, PerfLogger.ANALYZE);
plan = new QueryPlan(command, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
@@ -460,12 +465,12 @@ public class Driver implements CommandPr
// serialize the queryPlan
FileOutputStream fos = new FileOutputStream(queryPlanFileName);
- Utilities.serializeObject(plan, fos);
+ Utilities.serializePlan(plan, fos, conf);
fos.close();
// deserialize the queryPlan
FileInputStream fis = new FileInputStream(queryPlanFileName);
- QueryPlan newPlan = Utilities.deserializeObject(fis);
+ QueryPlan newPlan = Utilities.deserializePlan(fis, QueryPlan.class, conf);
fis.close();
// Use the deserialized plan
@@ -949,7 +954,8 @@ public class Driver implements CommandPr
// Get all the driver run hooks and pre-execute them.
List<HiveDriverRunHook> driverRunHooks;
try {
- driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, HiveDriverRunHook.class);
+ driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS,
+ HiveDriverRunHook.class);
for (HiveDriverRunHook driverRunHook : driverRunHooks) {
driverRunHook.preDriverRun(hookContext);
}
@@ -1064,19 +1070,14 @@ public class Driver implements CommandPr
/**
* Returns a set of hooks specified in a configuration variable.
- *
* See getHooks(HiveConf.ConfVars hookConfVar, Class<T> clazz)
- * @param hookConfVar
- * @return
- * @throws Exception
*/
private List<Hook> getHooks(HiveConf.ConfVars hookConfVar) throws Exception {
return getHooks(hookConfVar, Hook.class);
}
/**
- * Returns the hooks specified in a configuration variable. The hooks are returned in a list in
- * the order they were specified in the configuration variable.
+ * Returns the hooks specified in a configuration variable.
*
* @param hookConfVar The configuration variable specifying a comma separated list of the hook
* class names.
@@ -1085,34 +1086,14 @@ public class Driver implements CommandPr
* they are listed in the value of hookConfVar
* @throws Exception
*/
- private <T extends Hook> List<T> getHooks(HiveConf.ConfVars hookConfVar, Class<T> clazz)
- throws Exception {
-
- List<T> hooks = new ArrayList<T>();
- String csHooks = conf.getVar(hookConfVar);
- if (csHooks == null) {
- return hooks;
- }
-
- csHooks = csHooks.trim();
- if (csHooks.equals("")) {
- return hooks;
- }
-
- String[] hookClasses = csHooks.split(",");
-
- for (String hookClass : hookClasses) {
- try {
- T hook =
- (T) Class.forName(hookClass.trim(), true, JavaUtils.getClassLoader()).newInstance();
- hooks.add(hook);
- } catch (ClassNotFoundException e) {
- console.printError(hookConfVar.varname + " Class not found:" + e.getMessage());
- throw e;
- }
+ private <T extends Hook> List<T> getHooks(ConfVars hookConfVar,
+ Class<T> clazz) throws Exception {
+ try {
+ return HookUtils.getHooks(conf, hookConfVar, clazz);
+ } catch (ClassNotFoundException e) {
+ console.printError(hookConfVar.varname + " Class not found:" + e.getMessage());
+ throw e;
}
-
- return hooks;
}
public int execute() throws CommandNeedRetryException {
@@ -1202,11 +1183,13 @@ public class Driver implements CommandPr
}
perfLogger.PerfLogEnd(LOG, PerfLogger.TIME_TO_SUBMIT);
+ perfLogger.PerfLogBegin(LOG, PerfLogger.RUN_TASKS);
// Loop while you either have tasks running, or tasks queued up
while (running.size() != 0 || runnable.peek() != null) {
// Launch upto maxthreads tasks
while (runnable.peek() != null && running.size() < maxthreads) {
Task<? extends Serializable> tsk = runnable.remove();
+ perfLogger.PerfLogBegin(LOG, PerfLogger.TASK + tsk.getName() + "." + tsk.getId());
launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt);
}
@@ -1214,6 +1197,7 @@ public class Driver implements CommandPr
TaskResult tskRes = pollTasks(running.keySet());
TaskRunner tskRun = running.remove(tskRes);
Task<? extends Serializable> tsk = tskRun.getTask();
+ perfLogger.PerfLogEnd(LOG, PerfLogger.TASK + tsk.getName() + "." + tsk.getId());
hookContext.addCompleteTask(tskRun);
int exitVal = tskRes.getExitVal();
@@ -1277,6 +1261,7 @@ public class Driver implements CommandPr
}
}
}
+ perfLogger.PerfLogEnd(LOG, PerfLogger.RUN_TASKS);
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Thu Sep 12 01:21:10 2013
@@ -144,7 +144,7 @@ public enum ErrorMsg {
COLUMN_ALIAS_ALREADY_EXISTS(10074, "Column alias already exists:", "42S02"),
UDTF_MULTIPLE_EXPR(10075, "Only a single expression in the SELECT clause is "
+ "supported with UDTF's"),
- UDTF_REQUIRE_AS(10076, "UDTF's require an AS clause"),
+ @Deprecated UDTF_REQUIRE_AS(10076, "UDTF's require an AS clause"),
UDTF_NO_GROUP_BY(10077, "GROUP BY is not supported with a UDTF in the SELECT clause"),
UDTF_NO_SORT_BY(10078, "SORT BY is not supported with a UDTF in the SELECT clause"),
UDTF_NO_CLUSTER_BY(10079, "CLUSTER BY is not supported with a UDTF in the SELECT clause"),
@@ -360,6 +360,8 @@ public enum ErrorMsg {
CANNOT_REPLACE_COLUMNS(10243, "Replace columns is not supported for table {0}. SerDe may be incompatible.", true),
BAD_LOCATION_VALUE(10244, "{0} is not absolute or has no scheme information. Please specify a complete absolute uri with scheme information."),
UNSUPPORTED_ALTER_TBL_OP(10245, "{0} alter table options is not supported"),
+ INVALID_BIGTABLE_MAPJOIN(10246, "{0} table chosen for streaming is not valid", true),
+ MISSING_OVER_CLAUSE(10247, "Missing over clause for function : "),
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
@@ -616,8 +618,8 @@ public enum ErrorMsg {
return format(new String[]{reason});
}
/**
- * If the message is parametrized, this will fill the parameters with supplied
- * {@code reasons}, otherwise {@code reasons} are appended at the end of the
+ * If the message is parametrized, this will fill the parameters with supplied
+ * {@code reasons}, otherwise {@code reasons} are appended at the end of the
* message.
*/
public String format(String... reasons) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Thu Sep 12 01:21:10 2013
@@ -23,8 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
@@ -54,7 +53,7 @@ public abstract class AbstractMapJoinOpe
protected transient byte posBigTable = -1; // one of the tables that is not in memory
- protected transient RowContainer<ArrayList<Object>> emptyList = null;
+ protected transient RowContainer<List<Object>> emptyList = null;
transient int numMapRowsRead;
@@ -95,9 +94,9 @@ public abstract class AbstractMapJoinOpe
// all other tables are small, and are cached in the hash table
posBigTable = (byte) conf.getPosBigTable();
- emptyList = new RowContainer<ArrayList<Object>>(1, hconf, reporter);
+ emptyList = new RowContainer<List<Object>>(1, hconf, reporter);
- RowContainer bigPosRC = JoinUtil.getRowContainer(hconf,
+ RowContainer<List<Object>> bigPosRC = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors[posBigTable],
posBigTable, joinCacheSize,spillTableDesc, conf,
!hasFilter(posBigTable), reporter);
@@ -160,7 +159,7 @@ public abstract class AbstractMapJoinOpe
}
// returns true if there are elements in key list and any of them is null
- protected boolean hasAnyNulls(AbstractMapJoinKey key) {
+ protected boolean hasAnyNulls(MapJoinKey key) {
return key.hasAnyNulls(nullsafes);
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnInfo.java Thu Sep 12 01:21:10 2013
@@ -58,6 +58,8 @@ public class ColumnInfo implements Seria
private boolean isHiddenVirtualCol;
+ private String typeName;
+
public ColumnInfo() {
}
@@ -94,6 +96,7 @@ public class ColumnInfo implements Seria
this.tabAlias = tabAlias;
this.isVirtualCol = isVirtualCol;
this.isHiddenVirtualCol = isHiddenVirtualCol;
+ this.typeName = getType().getTypeName();
}
public ColumnInfo(ColumnInfo columnInfo) {
@@ -104,6 +107,15 @@ public class ColumnInfo implements Seria
this.isVirtualCol = columnInfo.getIsVirtualCol();
this.isHiddenVirtualCol = columnInfo.isHiddenVirtualCol();
this.setType(columnInfo.getType());
+ this.typeName = columnInfo.getType().getTypeName();
+ }
+
+ public String getTypeName() {
+ return this.typeName;
+ }
+
+ public void setTypeName(String typeName) {
+ this.typeName = typeName;
}
public TypeInfo getType() {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Thu Sep 12 01:21:10 2013
@@ -93,7 +93,7 @@ public abstract class CommonJoinOperator
protected transient ArrayList<Object>[] dummyObj;
// empty rows for each table
- protected transient RowContainer<ArrayList<Object>>[] dummyObjVectors;
+ protected transient RowContainer<List<Object>>[] dummyObjVectors;
protected transient int totalSz; // total size of the composite object
@@ -108,7 +108,7 @@ public abstract class CommonJoinOperator
// input is too large
// to fit in memory
- AbstractRowContainer<ArrayList<Object>>[] storage; // map b/w table alias
+ AbstractRowContainer<List<Object>>[] storage; // map b/w table alias
// to RowContainer
int joinEmitInterval = -1;
int joinCacheSize = 0;
@@ -274,7 +274,7 @@ public abstract class CommonJoinOperator
}
dummyObj[pos] = nr;
// there should be only 1 dummy object in the RowContainer
- RowContainer<ArrayList<Object>> values = JoinUtil.getRowContainer(hconf,
+ RowContainer<List<Object>> values = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors[pos],
alias, 1, spillTableDesc, conf, !hasFilter(pos), reporter);
@@ -283,7 +283,7 @@ public abstract class CommonJoinOperator
// if serde is null, the input doesn't need to be spilled out
// e.g., the output columns does not contains the input table
- RowContainer rc = JoinUtil.getRowContainer(hconf,
+ RowContainer<List<Object>> rc = JoinUtil.getRowContainer(hconf,
rowContainerStandardObjectInspectors[pos],
alias, joinCacheSize, spillTableDesc, conf, !hasFilter(pos), reporter);
storage[pos] = rc;
@@ -328,7 +328,7 @@ public abstract class CommonJoinOperator
public void startGroup() throws HiveException {
LOG.trace("Join: Starting new group");
newGroupStarted = true;
- for (AbstractRowContainer<ArrayList<Object>> alw : storage) {
+ for (AbstractRowContainer<List<Object>> alw : storage) {
alw.clear();
}
super.startGroup();
@@ -443,7 +443,7 @@ public abstract class CommonJoinOperator
private void genJoinObject() throws HiveException {
boolean rightFirst = true;
boolean hasFilter = hasFilter(order[0]);
- AbstractRowContainer<ArrayList<Object>> aliasRes = storage[order[0]];
+ AbstractRowContainer<List<Object>> aliasRes = storage[order[0]];
for (List<Object> rightObj = aliasRes.first(); rightObj != null; rightObj = aliasRes.next()) {
boolean rightNull = rightObj == dummyObj[0];
if (hasFilter) {
@@ -471,7 +471,7 @@ public abstract class CommonJoinOperator
int right = joinCond.getRight();
// search for match in the rhs table
- AbstractRowContainer<ArrayList<Object>> aliasRes = storage[order[aliasNum]];
+ AbstractRowContainer<List<Object>> aliasRes = storage[order[aliasNum]];
boolean done = false;
boolean loopAgain = false;
@@ -641,8 +641,8 @@ public abstract class CommonJoinOperator
private void genUniqueJoinObject(int aliasNum, int forwardCachePos)
throws HiveException {
- AbstractRowContainer<ArrayList<Object>> alias = storage[order[aliasNum]];
- for (ArrayList<Object> row = alias.first(); row != null; row = alias.next()) {
+ AbstractRowContainer<List<Object>> alias = storage[order[aliasNum]];
+ for (List<Object> row = alias.first(); row != null; row = alias.next()) {
int sz = joinValues[order[aliasNum]].size();
int p = forwardCachePos;
for (int j = 0; j < sz; j++) {
@@ -662,7 +662,7 @@ public abstract class CommonJoinOperator
int p = 0;
for (int i = 0; i < numAliases; i++) {
int sz = joinValues[order[i]].size();
- ArrayList<Object> obj = storage[order[i]].first();
+ List<Object> obj = storage[order[i]].first();
for (int j = 0; j < sz; j++) {
forwardCache[p++] = obj.get(j);
}
@@ -684,7 +684,7 @@ public abstract class CommonJoinOperator
boolean allOne = true;
for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
- AbstractRowContainer<ArrayList<Object>> alw = storage[alias];
+ AbstractRowContainer<List<Object>> alw = storage[alias];
if (alw.size() != 1) {
allOne = false;
@@ -717,7 +717,7 @@ public abstract class CommonJoinOperator
boolean hasEmpty = false;
for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
- AbstractRowContainer<ArrayList<Object>> alw = storage[alias];
+ AbstractRowContainer<List<Object>> alw = storage[alias];
if (noOuterJoin) {
if (alw.size() == 0) {
@@ -737,7 +737,7 @@ public abstract class CommonJoinOperator
} else {
mayHasMoreThanOne = true;
if (!hasEmpty) {
- for (ArrayList<Object> row = alw.first(); row != null; row = alw.next()) {
+ for (List<Object> row = alw.first(); row != null; row = alw.next()) {
reportProgress();
if (hasAnyFiltered(alias, row)) {
hasEmpty = true;
@@ -784,7 +784,7 @@ public abstract class CommonJoinOperator
@Override
public void closeOp(boolean abort) throws HiveException {
LOG.trace("Join Op close");
- for (AbstractRowContainer<ArrayList<Object>> alw : storage) {
+ for (AbstractRowContainer<List<Object>> alw : storage) {
if (alw != null) {
alw.clear(); // clean up the temp files
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java Thu Sep 12 01:21:10 2013
@@ -31,12 +31,12 @@ import org.apache.hadoop.hive.serde2.obj
*/
public class ExprNodeColumnEvaluator extends ExprNodeEvaluator<ExprNodeColumnDesc> {
- transient boolean simpleCase;
- transient StructObjectInspector inspector;
- transient StructField field;
- transient StructObjectInspector[] inspectors;
- transient StructField[] fields;
- transient boolean[] unionField;
+ private transient boolean simpleCase;
+ private transient StructObjectInspector inspector;
+ private transient StructField field;
+ private transient StructObjectInspector[] inspectors;
+ private transient StructField[] fields;
+ private transient boolean[] unionField;
public ExprNodeColumnEvaluator(ExprNodeColumnDesc expr) {
super(expr);
@@ -61,18 +61,17 @@ public class ExprNodeColumnEvaluator ext
fields = new StructField[names.length];
unionField = new boolean[names.length];
int unionIndex = -1;
-
for (int i = 0; i < names.length; i++) {
if (i == 0) {
inspectors[0] = (StructObjectInspector) rowInspector;
} else {
- if (unionIndex != -1) {
+ if (unionIndex == -1) {
+ inspectors[i] = (StructObjectInspector) fields[i - 1]
+ .getFieldObjectInspector();
+ } else {
inspectors[i] = (StructObjectInspector) (
(UnionObjectInspector)fields[i-1].getFieldObjectInspector()).
getObjectInspectors().get(unionIndex);
- } else {
- inspectors[i] = (StructObjectInspector) fields[i - 1]
- .getFieldObjectInspector();
}
}
// to support names like _colx:1._coly
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Thu Sep 12 01:21:10 2013
@@ -50,6 +50,11 @@ public class ExtractOperator extends Ope
return OperatorType.EXTRACT;
}
+ @Override
+ public boolean acceptLimitPushdown() {
+ return true;
+ }
+
/**
* @return the name of the operator
*/
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Thu Sep 12 01:21:10 2013
@@ -75,7 +75,7 @@ public class FetchOperator implements Se
private boolean isNativeTable;
private FetchWork work;
- private Operator<?> operator; // operator tree for processing row further (option)
+ protected Operator<?> operator; // operator tree for processing row further (option)
private int splitNum;
private PartitionDesc currPart;
private TableDesc currTbl;
@@ -396,7 +396,7 @@ public class FetchOperator implements Se
splitNum = 0;
serde = partDesc.getDeserializerClass().newInstance();
- serde.initialize(job, partDesc.getProperties());
+ serde.initialize(job, partDesc.getOverlayedProperties());
if (currTbl != null) {
tblSerde = serde;
@@ -408,7 +408,7 @@ public class FetchOperator implements Se
ObjectInspector outputOI = ObjectInspectorConverters.getConvertedOI(
serde.getObjectInspector(),
- partitionedTableOI == null ? tblSerde.getObjectInspector() : partitionedTableOI);
+ partitionedTableOI == null ? tblSerde.getObjectInspector() : partitionedTableOI, true);
partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
serde.getObjectInspector(), outputOI);
@@ -416,7 +416,7 @@ public class FetchOperator implements Se
if (LOG.isDebugEnabled()) {
LOG.debug("Creating fetchTask with deserializer typeinfo: "
+ serde.getObjectInspector().getTypeName());
- LOG.debug("deserializer properties: " + partDesc.getProperties());
+ LOG.debug("deserializer properties: " + partDesc.getOverlayedProperties());
}
if (currPart != null) {
@@ -495,6 +495,8 @@ public class FetchOperator implements Se
InspectableObject row = getNextRow();
if (row != null) {
pushRow(row);
+ } else {
+ operator.flush();
}
return row != null;
}
@@ -629,10 +631,10 @@ public class FetchOperator implements Se
for (PartitionDesc listPart : listParts) {
partition = listPart;
Deserializer partSerde = listPart.getDeserializerClass().newInstance();
- partSerde.initialize(job, listPart.getProperties());
+ partSerde.initialize(job, listPart.getOverlayedProperties());
partitionedTableOI = ObjectInspectorConverters.getConvertedOI(
- partSerde.getObjectInspector(), tableOI);
+ partSerde.getObjectInspector(), tableOI, true);
if (!partitionedTableOI.equals(tableOI)) {
break;
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java Thu Sep 12 01:21:10 2013
@@ -41,6 +41,11 @@ public class ForwardOperator extends Ope
return OperatorType.FORWARD;
}
+ @Override
+ public boolean acceptLimitPushdown() {
+ return true;
+ }
+
/**
* @return the name of the operator
*/
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Thu Sep 12 01:21:10 2013
@@ -23,7 +23,7 @@ import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.EnumMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -149,8 +149,12 @@ import org.apache.hadoop.hive.ql.udf.xml
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils.PrimitiveGrouping;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -478,7 +482,7 @@ public final class FunctionRegistry {
Class<? extends UDF> UDFClass, boolean isOperator, String displayName) {
if (UDF.class.isAssignableFrom(UDFClass)) {
FunctionInfo fI = new FunctionInfo(isNative, displayName,
- new GenericUDFBridge(displayName, isOperator, UDFClass));
+ new GenericUDFBridge(displayName, isOperator, UDFClass.getName()));
mFunctions.put(functionName.toLowerCase(), fI);
} else {
throw new RuntimeException("Registering UDF Class " + UDFClass
@@ -597,24 +601,53 @@ public final class FunctionRegistry {
return synonyms;
}
- static Map<TypeInfo, Integer> numericTypes = new HashMap<TypeInfo, Integer>();
- static List<TypeInfo> numericTypeList = new ArrayList<TypeInfo>();
-
- static void registerNumericType(String typeName, int level) {
- TypeInfo t = TypeInfoFactory.getPrimitiveTypeInfo(typeName);
- numericTypeList.add(t);
- numericTypes.put(t, level);
+ // The ordering of types here is used to determine which numeric types
+ // are common/convertible to one another. Probably better to rely on the
+ // ordering explicitly defined here than to assume that the enum values
+ // that were arbitrarily assigned in PrimitiveCategory work for our purposes.
+ static EnumMap<PrimitiveCategory, Integer> numericTypes =
+ new EnumMap<PrimitiveCategory, Integer>(PrimitiveCategory.class);
+ static List<PrimitiveCategory> numericTypeList = new ArrayList<PrimitiveCategory>();
+
+ static void registerNumericType(PrimitiveCategory primitiveCategory, int level) {
+ numericTypeList.add(primitiveCategory);
+ numericTypes.put(primitiveCategory, level);
}
static {
- registerNumericType(serdeConstants.TINYINT_TYPE_NAME, 1);
- registerNumericType(serdeConstants.SMALLINT_TYPE_NAME, 2);
- registerNumericType(serdeConstants.INT_TYPE_NAME, 3);
- registerNumericType(serdeConstants.BIGINT_TYPE_NAME, 4);
- registerNumericType(serdeConstants.FLOAT_TYPE_NAME, 5);
- registerNumericType(serdeConstants.DOUBLE_TYPE_NAME, 6);
- registerNumericType(serdeConstants.DECIMAL_TYPE_NAME, 7);
- registerNumericType(serdeConstants.STRING_TYPE_NAME, 8);
+ registerNumericType(PrimitiveCategory.BYTE, 1);
+ registerNumericType(PrimitiveCategory.SHORT, 2);
+ registerNumericType(PrimitiveCategory.INT, 3);
+ registerNumericType(PrimitiveCategory.LONG, 4);
+ registerNumericType(PrimitiveCategory.FLOAT, 5);
+ registerNumericType(PrimitiveCategory.DOUBLE, 6);
+ registerNumericType(PrimitiveCategory.DECIMAL, 7);
+ registerNumericType(PrimitiveCategory.STRING, 8);
+ }
+
+ /**
+ * Given 2 TypeInfo types and the PrimitiveCategory selected as the common class between the two,
+ * return a TypeInfo corresponding to the common PrimitiveCategory, and with type qualifiers
+ * (if applicable) that match the 2 TypeInfo types.
+ * Examples:
+ * varchar(10), varchar(20), primitive category varchar => varchar(20)
+ * date, string, primitive category string => string
+ * @param a TypeInfo of the first type
+ * @param b TypeInfo of the second type
+ * @param typeCategory PrimitiveCategory of the designated common type between a and b
+ * @return TypeInfo represented by the primitive category, with any applicable type qualifiers.
+ */
+ public static TypeInfo getTypeInfoForPrimitiveCategory(
+ PrimitiveTypeInfo a, PrimitiveTypeInfo b, PrimitiveCategory typeCategory) {
+ // For types with parameters (like varchar), we need to determine the type parameters
+ // that should be added to this type, based on the original 2 TypeInfos.
+ switch (typeCategory) {
+
+ default:
+ // Type doesn't require any qualifiers.
+ return TypeInfoFactory.getPrimitiveTypeInfo(
+ PrimitiveObjectInspectorUtils.getTypeEntryFromPrimitiveCategory(typeCategory).typeName);
+ }
}
/**
@@ -624,18 +657,38 @@ public final class FunctionRegistry {
if (a.equals(b)) {
return a;
}
+ if (a.getCategory() != Category.PRIMITIVE || b.getCategory() != Category.PRIMITIVE) {
+ return null;
+ }
+ PrimitiveCategory pcA = ((PrimitiveTypeInfo)a).getPrimitiveCategory();
+ PrimitiveCategory pcB = ((PrimitiveTypeInfo)b).getPrimitiveCategory();
+
+ if (pcA == pcB) {
+ // Same primitive category but different qualifiers.
+ return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, pcA);
+ }
+
+ PrimitiveGrouping pgA = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcA);
+ PrimitiveGrouping pgB = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcB);
+ // handle string types properly
+ if (pgA == PrimitiveGrouping.STRING_GROUP && pgB == PrimitiveGrouping.STRING_GROUP) {
+ return getTypeInfoForPrimitiveCategory(
+ (PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b,PrimitiveCategory.STRING);
+ }
+
if (FunctionRegistry.implicitConvertable(a, b)) {
- return b;
+ return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, pcB);
}
if (FunctionRegistry.implicitConvertable(b, a)) {
- return a;
+ return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, pcA);
}
- for (TypeInfo t : numericTypeList) {
- if (FunctionRegistry.implicitConvertable(a, t)
- && FunctionRegistry.implicitConvertable(b, t)) {
- return t;
+ for (PrimitiveCategory t : numericTypeList) {
+ if (FunctionRegistry.implicitConvertable(pcA, t)
+ && FunctionRegistry.implicitConvertable(pcB, t)) {
+ return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, t);
}
}
+
return null;
}
@@ -653,12 +706,34 @@ public final class FunctionRegistry {
if (a.equals(b)) {
return a;
}
- for (TypeInfo t : numericTypeList) {
- if (FunctionRegistry.implicitConvertable(a, t)
- && FunctionRegistry.implicitConvertable(b, t)) {
- return t;
+ if (a.getCategory() != Category.PRIMITIVE || b.getCategory() != Category.PRIMITIVE) {
+ return null;
+ }
+ PrimitiveCategory pcA = ((PrimitiveTypeInfo)a).getPrimitiveCategory();
+ PrimitiveCategory pcB = ((PrimitiveTypeInfo)b).getPrimitiveCategory();
+
+ if (pcA == pcB) {
+ // Same primitive category but different qualifiers.
+ // Rely on getTypeInfoForPrimitiveCategory() to sort out the type params.
+ return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, pcA);
+ }
+
+ PrimitiveGrouping pgA = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcA);
+ PrimitiveGrouping pgB = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcB);
+ // handle string types properly
+ if (pgA == PrimitiveGrouping.STRING_GROUP && pgB == PrimitiveGrouping.STRING_GROUP) {
+ // Compare as strings. Char comparison semantics may be different if/when implemented.
+ return getTypeInfoForPrimitiveCategory(
+ (PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b,PrimitiveCategory.STRING);
+ }
+
+ for (PrimitiveCategory t : numericTypeList) {
+ if (FunctionRegistry.implicitConvertable(pcA, t)
+ && FunctionRegistry.implicitConvertable(pcB, t)) {
+ return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, t);
}
}
+
return null;
}
@@ -674,45 +749,60 @@ public final class FunctionRegistry {
if (a.equals(b)) {
return a;
}
- Integer ai = numericTypes.get(a);
- Integer bi = numericTypes.get(b);
+ if (a.getCategory() != Category.PRIMITIVE || b.getCategory() != Category.PRIMITIVE) {
+ return null;
+ }
+ PrimitiveCategory pcA = ((PrimitiveTypeInfo)a).getPrimitiveCategory();
+ PrimitiveCategory pcB = ((PrimitiveTypeInfo)b).getPrimitiveCategory();
+
+ PrimitiveGrouping pgA = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcA);
+ PrimitiveGrouping pgB = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(pcB);
+ // handle string types properly
+ if (pgA == PrimitiveGrouping.STRING_GROUP && pgB == PrimitiveGrouping.STRING_GROUP) {
+ return getTypeInfoForPrimitiveCategory(
+ (PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b,PrimitiveCategory.STRING);
+ }
+
+ Integer ai = numericTypes.get(pcA);
+ Integer bi = numericTypes.get(pcB);
if (ai == null || bi == null) {
// If either is not a numeric type, return null.
return null;
}
- return (ai > bi) ? a : b;
+ PrimitiveCategory pcCommon = (ai > bi) ? pcA : pcB;
+ return getTypeInfoForPrimitiveCategory((PrimitiveTypeInfo)a, (PrimitiveTypeInfo)b, pcCommon);
}
- /**
- * Returns whether it is possible to implicitly convert an object of Class
- * from to Class to.
- */
- public static boolean implicitConvertable(TypeInfo from, TypeInfo to) {
- if (from.equals(to)) {
+ public static boolean implicitConvertable(PrimitiveCategory from, PrimitiveCategory to) {
+ if (from == to) {
return true;
}
+
+ PrimitiveGrouping fromPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(from);
+ PrimitiveGrouping toPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(to);
+
// Allow implicit String to Double conversion
- if (from.equals(TypeInfoFactory.stringTypeInfo)
- && to.equals(TypeInfoFactory.doubleTypeInfo)) {
+ if (fromPg == PrimitiveGrouping.STRING_GROUP && to == PrimitiveCategory.DOUBLE) {
return true;
}
// Allow implicit String to Decimal conversion
- if (from.equals(TypeInfoFactory.stringTypeInfo)
- && to.equals(TypeInfoFactory.decimalTypeInfo)) {
+ if (fromPg == PrimitiveGrouping.STRING_GROUP && to == PrimitiveCategory.DECIMAL) {
return true;
}
// Void can be converted to any type
- if (from.equals(TypeInfoFactory.voidTypeInfo)) {
+ if (from == PrimitiveCategory.VOID) {
return true;
}
// Allow implicit String to Date conversion
- if (from.equals(TypeInfoFactory.dateTypeInfo)
- && to.equals(TypeInfoFactory.stringTypeInfo)) {
+ if (fromPg == PrimitiveGrouping.DATE_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
return true;
}
-
- if (from.equals(TypeInfoFactory.timestampTypeInfo)
- && to.equals(TypeInfoFactory.stringTypeInfo)) {
+ // Allow implicit Numeric to String conversion
+ if (fromPg == PrimitiveGrouping.NUMERIC_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
+ return true;
+ }
+ // Allow implicit String to varchar conversion, and vice versa
+ if (fromPg == PrimitiveGrouping.STRING_GROUP && toPg == PrimitiveGrouping.STRING_GROUP) {
return true;
}
@@ -730,6 +820,27 @@ public final class FunctionRegistry {
}
/**
+ * Returns whether it is possible to implicitly convert an object of Class
+ * from to Class to.
+ */
+ public static boolean implicitConvertable(TypeInfo from, TypeInfo to) {
+ if (from.equals(to)) {
+ return true;
+ }
+
+ // Reimplemented to use PrimitiveCategory rather than TypeInfo, because
+ // 2 TypeInfos from the same qualified type (varchar, decimal) should still be
+ // seen as equivalent.
+ if (from.getCategory() == Category.PRIMITIVE && to.getCategory() == Category.PRIMITIVE) {
+ return implicitConvertable(
+ ((PrimitiveTypeInfo)from).getPrimitiveCategory(),
+ ((PrimitiveTypeInfo)to).getPrimitiveCategory());
+ }
+ return false;
+ }
+
+
+ /**
* Get the GenericUDAF evaluator for the name and argumentClasses.
*
* @param name
@@ -956,6 +1067,59 @@ public final class FunctionRegistry {
}
/**
+ * Given a set of candidate methods and list of argument types, try to
+ * select the best candidate based on how close the passed argument types are
+ * to the candidate argument types.
+ * For a varchar argument, we would prefer evaluate(string) over evaluate(double).
+ * @param udfMethods list of candidate methods
+ * @param argumentsPassed list of argument types to match to the candidate methods
+ */
+ static void filterMethodsByTypeAffinity(List<Method> udfMethods, List<TypeInfo> argumentsPassed) {
+ if (udfMethods.size() > 1) {
+ // Prefer methods with a closer signature based on the primitive grouping of each argument.
+ // Score each method based on its similarity to the passed argument types.
+ int currentScore = 0;
+ int bestMatchScore = 0;
+ Method bestMatch = null;
+ for (Method m: udfMethods) {
+ currentScore = 0;
+ List<TypeInfo> argumentsAccepted =
+ TypeInfoUtils.getParameterTypeInfos(m, argumentsPassed.size());
+ Iterator<TypeInfo> argsPassedIter = argumentsPassed.iterator();
+ for (TypeInfo acceptedType : argumentsAccepted) {
+ // Check the affinity of the argument passed in with the accepted argument,
+ // based on the PrimitiveGrouping
+ TypeInfo passedType = argsPassedIter.next();
+ if (acceptedType.getCategory() == Category.PRIMITIVE
+ && passedType.getCategory() == Category.PRIMITIVE) {
+ PrimitiveGrouping acceptedPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
+ ((PrimitiveTypeInfo) acceptedType).getPrimitiveCategory());
+ PrimitiveGrouping passedPg = PrimitiveObjectInspectorUtils.getPrimitiveGrouping(
+ ((PrimitiveTypeInfo) passedType).getPrimitiveCategory());
+ if (acceptedPg == passedPg) {
+ // The passed argument matches somewhat closely with an accepted argument
+ ++currentScore;
+ }
+ }
+ }
+ // Check if the score for this method is any better relative to others
+ if (currentScore > bestMatchScore) {
+ bestMatchScore = currentScore;
+ bestMatch = m;
+ } else if (currentScore == bestMatchScore) {
+ bestMatch = null; // no longer a best match if more than one.
+ }
+ }
+
+ if (bestMatch != null) {
+ // Found a best match during this processing, use it.
+ udfMethods.clear();
+ udfMethods.add(bestMatch);
+ }
+ }
+ }
+
+ /**
* Gets the closest matching method corresponding to the argument list from a
* list of methods.
*
@@ -1025,6 +1189,13 @@ public final class FunctionRegistry {
// No matching methods found
throw new NoMatchingMethodException(udfClass, argumentsPassed, mlist);
}
+
+ if (udfMethods.size() > 1) {
+ // First try selecting methods based on the type affinity of the arguments passed
+ // to the candidate method arguments.
+ filterMethodsByTypeAffinity(udfMethods, argumentsPassed);
+ }
+
if (udfMethods.size() > 1) {
// if the only difference is numeric types, pick the method
@@ -1050,9 +1221,15 @@ public final class FunctionRegistry {
for (TypeInfo accepted: argumentsAccepted) {
TypeInfo reference = referenceIterator.next();
- if (numericTypes.containsKey(accepted)) {
+ boolean acceptedIsPrimitive = false;
+ PrimitiveCategory acceptedPrimCat = PrimitiveCategory.UNKNOWN;
+ if (accepted.getCategory() == Category.PRIMITIVE) {
+ acceptedIsPrimitive = true;
+ acceptedPrimCat = ((PrimitiveTypeInfo) accepted).getPrimitiveCategory();
+ }
+ if (acceptedIsPrimitive && numericTypes.containsKey(acceptedPrimCat)) {
// We're looking for the udf with the smallest maximum numeric type.
- int typeValue = numericTypes.get(accepted);
+ int typeValue = numericTypes.get(acceptedPrimCat);
maxNumericType = typeValue > maxNumericType ? typeValue : maxNumericType;
} else if (!accepted.equals(reference)) {
// There are non-numeric arguments that don't match from one UDF to
@@ -1107,7 +1284,7 @@ public final class FunctionRegistry {
if (genericUDF instanceof GenericUDFBridge) {
GenericUDFBridge bridge = (GenericUDFBridge) genericUDF;
return new GenericUDFBridge(bridge.getUdfName(), bridge.isOperator(),
- bridge.getUdfClass());
+ bridge.getUdfClassName());
} else if (genericUDF instanceof GenericUDFMacro) {
GenericUDFMacro bridge = (GenericUDFMacro) genericUDF;
return new GenericUDFMacro(bridge.getMacroName(), bridge.getBody(),
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Thu Sep 12 01:21:10 2013
@@ -60,7 +60,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
@@ -180,7 +179,7 @@ public class GroupByOperator extends Ope
* Current number of entries in the hash table.
*/
protected transient int numEntriesHashTable;
- transient int countAfterReport;
+ transient int countAfterReport; // report or forward
transient int heartbeatInterval;
public static FastBitSet groupingSet2BitSet(int value) {
@@ -244,15 +243,15 @@ public class GroupByOperator extends Ope
// reduce KEY has union field as the last field if there are distinct
// aggregates in group-by.
List<? extends StructField> sfs =
- ((StandardStructObjectInspector) rowInspector).getAllStructFieldRefs();
+ ((StructObjectInspector) rowInspector).getAllStructFieldRefs();
if (sfs.size() > 0) {
StructField keyField = sfs.get(0);
if (keyField.getFieldName().toUpperCase().equals(
Utilities.ReduceField.KEY.name())) {
ObjectInspector keyObjInspector = keyField.getFieldObjectInspector();
- if (keyObjInspector instanceof StandardStructObjectInspector) {
+ if (keyObjInspector instanceof StructObjectInspector) {
List<? extends StructField> keysfs =
- ((StandardStructObjectInspector) keyObjInspector).getAllStructFieldRefs();
+ ((StructObjectInspector) keyObjInspector).getAllStructFieldRefs();
if (keysfs.size() > 0) {
// the last field is the union field, if any
StructField sf = keysfs.get(keysfs.size() - 1);
@@ -1197,4 +1196,15 @@ public class GroupByOperator extends Ope
public OperatorType getType() {
return OperatorType.GROUPBY;
}
+
+ /**
+ * we can push the limit above GBY (running in Reducer), since that will generate single row
+ * for each group. This doesn't necessarily hold for GBY (running in Mappers),
+ * so we don't push limit above it.
+ */
+ @Override
+ public boolean acceptLimitPushdown() {
+ return getConf().getMode() == GroupByDesc.Mode.MERGEPARTIAL ||
+ getConf().getMode() == GroupByDesc.Mode.COMPLETE;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableDummyOperator.java Thu Sep 12 01:21:10 2013
@@ -38,7 +38,7 @@ public class HashTableDummyOperator exte
this.outputObjInspector = serde.getObjectInspector();
initializeChildren(hconf);
} catch (Exception e) {
- LOG.error("Generating output obj inspector from dummy object error");
+ LOG.error("Generating output obj inspector from dummy object error", e);
e.printStackTrace();
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Thu Sep 12 01:21:10 2013
@@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hive.ql.exec;
-import java.io.File;
+import java.io.BufferedOutputStream;
+import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
@@ -28,11 +29,13 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -41,10 +44,8 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.util.ReflectionUtils;
@@ -54,128 +55,57 @@ public class HashTableSinkOperator exten
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
- protected static MapJoinMetaData metadata = new MapJoinMetaData();
- // from abstract map join operator
/**
* The expressions for join inputs's join keys.
*/
- protected transient List<ExprNodeEvaluator>[] joinKeys;
+ private transient List<ExprNodeEvaluator>[] joinKeys;
/**
* The ObjectInspectors for the join inputs's join keys.
*/
- protected transient List<ObjectInspector>[] joinKeysObjectInspectors;
- /**
- * The standard ObjectInspectors for the join inputs's join keys.
- */
- protected transient List<ObjectInspector>[] joinKeysStandardObjectInspectors;
-
- protected transient int posBigTableAlias = -1; // one of the tables that is not in memory
+ private transient List<ObjectInspector>[] joinKeysObjectInspectors;
- protected transient RowContainer<ArrayList<Object>> emptyList = null;
+ private transient int posBigTableAlias = -1; // one of the tables that is not in memory
- transient int numMapRowsRead;
- protected transient int totalSz; // total size of the composite object
- transient boolean firstRow;
/**
* The filters for join
*/
- protected transient List<ExprNodeEvaluator>[] joinFilters;
+ private transient List<ExprNodeEvaluator>[] joinFilters;
- protected transient int[][] filterMaps;
+ private transient int[][] filterMaps;
- protected transient int numAliases; // number of aliases
/**
* The expressions for join outputs.
*/
- protected transient List<ExprNodeEvaluator>[] joinValues;
+ private transient List<ExprNodeEvaluator>[] joinValues;
/**
* The ObjectInspectors for the join inputs.
*/
- protected transient List<ObjectInspector>[] joinValuesObjectInspectors;
+ private transient List<ObjectInspector>[] joinValuesObjectInspectors;
/**
* The ObjectInspectors for join filters.
*/
- protected transient List<ObjectInspector>[] joinFilterObjectInspectors;
- /**
- * The standard ObjectInspectors for the join inputs.
- */
- protected transient List<ObjectInspector>[] joinValuesStandardObjectInspectors;
+ private transient List<ObjectInspector>[] joinFilterObjectInspectors;
- protected transient List<ObjectInspector>[] rowContainerStandardObjectInspectors;
-
- protected transient Byte[] order; // order in which the results should
- Configuration hconf;
- protected transient Byte alias;
- protected transient TableDesc[] spillTableDesc; // spill tables are
-
- protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
- protected transient boolean noOuterJoin;
+ private transient Byte[] order; // order in which the results should
+ private Configuration hconf;
+ private transient Byte alias;
+
+ private transient MapJoinTableContainer[] mapJoinTables;
+ private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+
+ private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+ private static final MapJoinRowContainer EMPTY_ROW_CONTAINER = new MapJoinRowContainer();
+ static {
+ EMPTY_ROW_CONTAINER.add(EMPTY_OBJECT_ARRAY);
+ }
+
+ private transient boolean noOuterJoin;
private long rowNumber = 0;
- protected transient LogHelper console;
+ private transient LogHelper console;
private long hashTableScale;
- private boolean isAbort = false;
-
- public static class HashTableSinkObjectCtx {
- ObjectInspector standardOI;
- SerDe serde;
- TableDesc tblDesc;
- Configuration conf;
- boolean hasFilter;
-
- /**
- * @param standardOI
- * @param serde
- */
- public HashTableSinkObjectCtx(ObjectInspector standardOI, SerDe serde, TableDesc tblDesc,
- boolean hasFilter, Configuration conf) {
- this.standardOI = standardOI;
- this.serde = serde;
- this.tblDesc = tblDesc;
- this.hasFilter = hasFilter;
- this.conf = conf;
- }
-
- /**
- * @return the standardOI
- */
- public ObjectInspector getStandardOI() {
- return standardOI;
- }
-
- /**
- * @return the serde
- */
- public SerDe getSerDe() {
- return serde;
- }
-
- public TableDesc getTblDesc() {
- return tblDesc;
- }
-
- public boolean hasFilterTag() {
- return hasFilter;
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- }
-
- public static MapJoinMetaData getMetadata() {
- return metadata;
- }
-
- private static final transient String[] FATAL_ERR_MSG = {
- null, // counter value 0 means no error
- "Mapside join exceeds available memory. "
- + "Please try removing the mapjoin hint."};
- private final int metadataKeyTag = -1;
- transient int[] metadataValueTag;
-
-
+ private MapJoinMemoryExhaustionHandler memoryExhaustionHandler;
+
public HashTableSinkOperator() {
}
@@ -189,8 +119,7 @@ public class HashTableSinkOperator exten
protected void initializeOp(Configuration hconf) throws HiveException {
boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT);
console = new LogHelper(LOG, isSilent);
- numMapRowsRead = 0;
- firstRow = true;
+ memoryExhaustionHandler = new MapJoinMemoryExhaustionHandler(console, conf.getHashtableMemoryUsage());
// for small tables only; so get the big table position first
posBigTableAlias = conf.getPosBigTable();
@@ -198,9 +127,7 @@ public class HashTableSinkOperator exten
order = conf.getTagOrder();
// initialize some variables, which used to be initialized in CommonJoinOperator
- numAliases = conf.getExprs().size();
this.hconf = hconf;
- totalSz = 0;
noOuterJoin = conf.isNoOuterJoin();
filterMaps = conf.getFilterMap();
@@ -212,16 +139,12 @@ public class HashTableSinkOperator exten
JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), posBigTableAlias);
joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
inputObjInspectors, posBigTableAlias, tagLen);
- joinKeysStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
- joinKeysObjectInspectors, posBigTableAlias, tagLen);
// process join values
joinValues = new List[tagLen];
JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), posBigTableAlias);
joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues,
inputObjInspectors, posBigTableAlias, tagLen);
- joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
- joinValuesObjectInspectors, posBigTableAlias, tagLen);
// process join filters
joinFilters = new List[tagLen];
@@ -229,9 +152,7 @@ public class HashTableSinkOperator exten
joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters,
inputObjInspectors, posBigTableAlias, tagLen);
- if (noOuterJoin) {
- rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors;
- } else {
+ if (!noOuterJoin) {
List<ObjectInspector>[] rowContainerObjectInspectors = new List[tagLen];
for (Byte alias : order) {
if (alias == posBigTableAlias) {
@@ -245,43 +166,43 @@ public class HashTableSinkOperator exten
}
rowContainerObjectInspectors[alias] = rcOIs;
}
- rowContainerStandardObjectInspectors = getStandardObjectInspectors(
- rowContainerObjectInspectors, tagLen);
- }
-
- metadataValueTag = new int[numAliases];
- for (int pos = 0; pos < numAliases; pos++) {
- metadataValueTag[pos] = -1;
}
- mapJoinTables = new HashMapWrapper[tagLen];
-
+ mapJoinTables = new MapJoinTableContainer[tagLen];
+ mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
- float hashTableMaxMemoryUsage = this.getConf().getHashtableMemoryUsage();
-
hashTableScale = HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVEHASHTABLESCALE);
if (hashTableScale <= 0) {
hashTableScale = 1;
}
-
- // initialize the hash tables for other tables
- for (Byte pos : order) {
- if (pos == posBigTableAlias) {
- continue;
+ try {
+ TableDesc keyTableDesc = conf.getKeyTblDesc();
+ SerDe keySerde = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
+ null);
+ keySerde.initialize(null, keyTableDesc.getProperties());
+ MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerde, false);
+ for (Byte pos : order) {
+ if (pos == posBigTableAlias) {
+ continue;
+ }
+ mapJoinTables[pos] = new HashMapWrapper(hashTableThreshold, hashTableLoadFactor);
+ TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(pos);
+ SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+ valueSerDe.initialize(null, valueTableDesc.getProperties());
+ mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, new MapJoinObjectSerDeContext(
+ valueSerDe, hasFilter(pos)));
}
-
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>(
- hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage);
-
- mapJoinTables[pos] = hashTable;
+ } catch (SerDeException e) {
+ throw new HiveException(e);
}
}
- protected static List<ObjectInspector>[] getStandardObjectInspectors(
+ private static List<ObjectInspector>[] getStandardObjectInspectors(
List<ObjectInspector>[] aliasToObjectInspectors, int maxTag) {
+ @SuppressWarnings("unchecked")
List<ObjectInspector>[] result = new List[maxTag];
for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
@@ -299,104 +220,45 @@ public class HashTableSinkOperator exten
}
- private void setKeyMetaData() throws SerDeException {
- TableDesc keyTableDesc = conf.getKeyTblDesc();
- SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
- null);
- keySerializer.initialize(null, keyTableDesc.getProperties());
-
- metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
- ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
- }
-
- private boolean hasFilter(int alias) {
- return filterMaps != null && filterMaps[alias] != null;
- }
/*
* This operator only process small tables Read the key/value pairs Load them into hashtable
*/
@Override
public void processOp(Object row, int tag) throws HiveException {
- // let the mapJoinOp process these small tables
- try {
- if (firstRow) {
- // generate the map metadata
- setKeyMetaData();
- firstRow = false;
- }
- alias = (byte)tag;
-
- // compute keys and values as StandardObjects
- AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
- joinKeysObjectInspectors[alias]);
-
- Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
- joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
- filterMaps == null ? null : filterMaps[alias]);
-
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables[alias];
-
- MapJoinObjectValue o = hashTable.get(keyMap);
- MapJoinRowContainer<Object[]> res = null;
-
- boolean needNewKey = true;
- if (o == null) {
- res = new MapJoinRowContainer<Object[]>();
- res.add(value);
-
- if (metadataValueTag[tag] == -1) {
- metadataValueTag[tag] = order[tag];
- setValueMetaData(tag);
- }
-
- // Construct externalizable objects for key and value
- if (needNewKey) {
- MapJoinObjectValue valueObj = new MapJoinObjectValue(
- metadataValueTag[tag], res);
-
- rowNumber++;
- if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
- isAbort = hashTable.isAbort(rowNumber, console);
- if (isAbort) {
- throw new HiveException("RunOutOfMeomoryUsage");
- }
- }
- hashTable.put(keyMap, valueObj);
- }
-
+ alias = (byte)tag;
+ // compute keys and values as StandardObjects
+ MapJoinKey key = JoinUtil.computeMapJoinKeys(null, row, joinKeys[alias],
+ joinKeysObjectInspectors[alias]);
+ Object[] value = EMPTY_OBJECT_ARRAY;
+ if((hasFilter(alias) && filterMaps[alias].length > 0) || joinValues[alias].size() > 0) {
+ value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
+ joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
+ filterMaps == null ? null : filterMaps[alias]);
+ }
+ MapJoinTableContainer tableContainer = mapJoinTables[alias];
+ MapJoinRowContainer rowContainer = tableContainer.get(key);
+ if (rowContainer == null) {
+ if(value.length != 0) {
+ rowContainer = new MapJoinRowContainer();
+ rowContainer.add(value);
} else {
- res = o.getObj();
- res.add(value);
+ rowContainer = EMPTY_ROW_CONTAINER;
}
-
-
- } catch (SerDeException e) {
- throw new HiveException(e);
+ rowNumber++;
+ if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
+ memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber);
+ }
+ tableContainer.put(key, rowContainer);
+ } else if (rowContainer == EMPTY_ROW_CONTAINER) {
+ rowContainer = rowContainer.copy();
+ rowContainer.add(value);
+ tableContainer.put(key, rowContainer);
+ } else {
+ rowContainer.add(value);
}
-
}
-
- private void setValueMetaData(int tag) throws SerDeException {
- TableDesc valueTableDesc = conf.getValueTblFilteredDescs().get(tag);
- SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
- null);
-
- valueSerDe.initialize(null, valueTableDesc.getProperties());
-
- List<ObjectInspector> newFields = rowContainerStandardObjectInspectors[alias];
- int length = newFields.size();
- List<String> newNames = new ArrayList<String>(length);
- for (int i = 0; i < length; i++) {
- String tmp = new String("tmp_" + i);
- newNames.add(tmp);
- }
- StandardStructObjectInspector standardOI = ObjectInspectorFactory
- .getStandardStructObjectInspector(newNames, newFields);
-
- int alias = Integer.valueOf(metadataValueTag[tag]);
- metadata.put(Integer.valueOf(metadataValueTag[tag]), new HashTableSinkObjectCtx(
- standardOI, valueSerDe, valueTableDesc, hasFilter(alias), hconf));
+ private boolean hasFilter(int alias) {
+ return filterMaps != null && filterMaps[alias] != null;
}
@Override
@@ -405,42 +267,36 @@ public class HashTableSinkOperator exten
if (mapJoinTables != null) {
// get tmp file URI
String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
- LOG.info("Get TMP URI: " + tmpURI);
- long fileLength;
+ LOG.info("Temp URI for side table: " + tmpURI);
for (byte tag = 0; tag < mapJoinTables.length; tag++) {
// get the key and value
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables[tag];
- if (hashTable == null) {
+ MapJoinTableContainer tableContainer = mapJoinTables[tag];
+ if (tableContainer == null) {
continue;
}
-
// get current input file name
String bigBucketFileName = getExecContext().getCurrentBigBucketFile();
-
String fileName = getExecContext().getLocalWork().getBucketFileName(bigBucketFileName);
-
// get the tmp URI path; it will be a hdfs path if not local mode
String dumpFilePrefix = conf.getDumpFilePrefix();
String tmpURIPath = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
- hashTable.isAbort(rowNumber, console);
- console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
+ console.printInfo(Utilities.now() + "\tDump the side-table into file: " + tmpURIPath);
// get the hashtable file and path
Path path = new Path(tmpURIPath);
FileSystem fs = path.getFileSystem(hconf);
- File file = new File(path.toUri().getPath());
- fs.create(path);
- fileLength = hashTable.flushMemoryCacheToPersistent(file);
- console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
- + fileLength);
-
- hashTable.close();
+ ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path), 4096));
+ try {
+ mapJoinTableSerdes[tag].persist(out, tableContainer);
+ } finally {
+ out.close();
+ }
+ tableContainer.clear();
+ console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath);
}
}
-
super.closeOp(abort);
} catch (Exception e) {
- LOG.error("Generate Hashtable error", e);
- e.printStackTrace();
+ LOG.error("Error generating side-table", e);
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Thu Sep 12 01:21:10 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDeUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.LongWritable;
@@ -114,8 +113,8 @@ public class JoinOperator extends Common
// Add the value to the vector
// if join-key is null, process each row in different group.
- StandardStructObjectInspector inspector =
- (StandardStructObjectInspector) sf.getFieldObjectInspector();
+ StructObjectInspector inspector =
+ (StructObjectInspector) sf.getFieldObjectInspector();
if (SerDeUtils.hasAnyNullObject(keyObject, inspector, nullsafes)) {
endGroup();
startGroup();
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java Thu Sep 12 01:21:10 2013
@@ -23,10 +23,7 @@ import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinSingleKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -147,42 +144,22 @@ public class JoinUtil {
/**
* Return the key as a standard object. StandardObject can be inspected by a
- * standard ObjectInspector.
+ * standard ObjectInspector. The first parameter a MapJoinKey can
+ * be null if the caller would like a new object to be instantiated.
*/
- public static AbstractMapJoinKey computeMapJoinKeys(Object row,
+ public static MapJoinKey computeMapJoinKeys(MapJoinKey key, Object row,
List<ExprNodeEvaluator> keyFields, List<ObjectInspector> keyFieldsOI)
throws HiveException {
-
int size = keyFields.size();
- if(size == 1){
- Object obj = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(0)
- .evaluate(row), keyFieldsOI.get(0),
- ObjectInspectorCopyOption.WRITABLE));
- MapJoinSingleKey key = new MapJoinSingleKey(obj);
- return key;
- }else if(size == 2){
- Object obj1 = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(0)
- .evaluate(row), keyFieldsOI.get(0),
- ObjectInspectorCopyOption.WRITABLE));
-
- Object obj2 = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(1)
- .evaluate(row), keyFieldsOI.get(1),
- ObjectInspectorCopyOption.WRITABLE));
-
- MapJoinDoubleKeys key = new MapJoinDoubleKeys(obj1,obj2);
- return key;
- }else{
- // Compute the keys
- Object[] nr = new Object[keyFields.size()];
- for (int i = 0; i < keyFields.size(); i++) {
-
- nr[i] = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(i)
- .evaluate(row), keyFieldsOI.get(i),
- ObjectInspectorCopyOption.WRITABLE));
- }
- MapJoinObjectKey key = new MapJoinObjectKey(nr);
- return key;
- }
+ if(key == null || key.getKey().length != size) {
+ key = new MapJoinKey(new Object[size]);
+ }
+ Object[] array = key.getKey();
+ for (int keyIndex = 0; keyIndex < size; keyIndex++) {
+ array[keyIndex] = (ObjectInspectorUtils.copyToStandardObject(keyFields.get(keyIndex)
+ .evaluate(row), keyFieldsOI.get(keyIndex), ObjectInspectorCopyOption.WRITABLE));
+ }
+ return key;
}
@@ -354,7 +331,7 @@ public class JoinUtil {
}
- public static RowContainer getRowContainer(Configuration hconf,
+ public static RowContainer<List<Object>> getRowContainer(Configuration hconf,
List<ObjectInspector> structFieldObjectInspectors,
Byte alias,int containerSize, TableDesc[] spillTableDesc,
JoinDesc conf,boolean noFilter, Reporter reporter) throws HiveException {
@@ -366,7 +343,7 @@ public class JoinUtil {
containerSize = -1;
}
- RowContainer rc = new RowContainer(containerSize, hconf, reporter);
+ RowContainer<List<Object>> rc = new RowContainer<List<Object>>(containerSize, hconf, reporter);
StructObjectInspector rcOI = null;
if (tblDesc != null) {
// arbitrary column names used internally for serializing to spill table
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Thu Sep 12 01:21:10 2013
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
@@ -27,20 +30,17 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.util.ReflectionUtils;
@@ -51,23 +51,15 @@ public class MapJoinOperator extends Abs
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
-
- protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
-
- protected static MapJoinMetaData metadata = new MapJoinMetaData();
- public static MapJoinMetaData getMetadata() {
- return metadata;
- }
-
private static final transient String[] FATAL_ERR_MSG = {
null, // counter value 0 means no error
"Mapside join exceeds available memory. "
+ "Please try removing the mapjoin hint."};
- protected transient MapJoinRowContainer<ArrayList<Object>>[] rowContainerMap;
- transient int metadataKeyTag;
- transient int[] metadataValueTag;
- transient boolean hashTblInitedOnce;
+ private transient MapJoinTableContainer[] mapJoinTables;
+ private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+ private transient boolean hashTblInitedOnce;
+ private transient MapJoinKey key;
public MapJoinOperator() {
}
@@ -77,35 +69,11 @@ public class MapJoinOperator extends Abs
}
@Override
- @SuppressWarnings("unchecked")
protected void initializeOp(Configuration hconf) throws HiveException {
-
super.initializeOp(hconf);
-
- metadataValueTag = new int[numAliases];
- for (int pos = 0; pos < numAliases; pos++) {
- metadataValueTag[pos] = -1;
- }
-
- metadataKeyTag = -1;
-
int tagLen = conf.getTagLength();
-
- mapJoinTables = new HashMapWrapper[tagLen];
- rowContainerMap = new MapJoinRowContainer[tagLen];
- // initialize the hash tables for other tables
- for (int pos = 0; pos < numAliases; pos++) {
- if (pos == posBigTable) {
- continue;
- }
-
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
-
- mapJoinTables[pos] = hashTable;
- MapJoinRowContainer<ArrayList<Object>> rowContainer = new MapJoinRowContainer<ArrayList<Object>>();
- rowContainerMap[pos] = rowContainer;
- }
-
+ mapJoinTables = new MapJoinTableContainer[tagLen];
+ mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
hashTblInitedOnce = false;
}
@@ -118,14 +86,12 @@ public class MapJoinOperator extends Abs
public void generateMapMetaData() throws HiveException, SerDeException {
// generate the meta data for key
// index for key is -1
+
TableDesc keyTableDesc = conf.getKeyTblDesc();
SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
null);
keySerializer.initialize(null, keyTableDesc.getProperties());
- metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
- ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
- ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
-
+ MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false);
for (int pos = 0; pos < order.length; pos++) {
if (pos == posBigTable) {
continue;
@@ -139,16 +105,12 @@ public class MapJoinOperator extends Abs
SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(),
null);
valueSerDe.initialize(null, valueTableDesc.getProperties());
-
- ObjectInspector inspector = valueSerDe.getObjectInspector();
- metadata.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils
- .getStandardObjectInspector(inspector, ObjectInspectorCopyOption.WRITABLE),
- valueSerDe, valueTableDesc, hasFilter(pos), hconf));
+ MapJoinObjectSerDeContext valueContext = new MapJoinObjectSerDeContext(valueSerDe, hasFilter(pos));
+ mapJoinTableSerdes[pos] = new MapJoinTableContainerSerDe(keyContext, valueContext);
}
}
private void loadHashTable() throws HiveException {
-
if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
if (hashTblInitedOnce) {
return;
@@ -158,12 +120,9 @@ public class MapJoinOperator extends Abs
}
String baseDir = null;
-
String currentInputFile = getExecContext().getCurrentInputFile();
LOG.info("******* Load from HashTable File: input : " + currentInputFile);
-
String fileName = getExecContext().getLocalWork().getBucketFileName(currentInputFile);
-
try {
if (ShimLoader.getHadoopShims().isLocalMode(hconf)) {
baseDir = this.getExecContext().getLocalWork().getTmpFileURI();
@@ -183,18 +142,25 @@ public class MapJoinOperator extends Abs
baseDir = archiveLocalLink.toUri().getPath();
}
}
- for (byte pos = 0; pos < mapJoinTables.length; pos++) {
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashtable = mapJoinTables[pos];
- if (hashtable == null) {
+ for (int pos = 0; pos < mapJoinTables.length; pos++) {
+ if (pos == posBigTable) {
continue;
}
- String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), pos, fileName);
+ if(baseDir == null) {
+ throw new IllegalStateException("baseDir cannot be null");
+ }
+ String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), (byte)pos, fileName);
Path path = new Path(filePath);
- LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString());
- hashtable.initilizePersistentHash(path.toUri().getPath());
+ LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path);
+ ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(
+ new FileInputStream(path.toUri().getPath()), 4096));
+ try{
+ mapJoinTables[pos] = mapJoinTableSerdes[pos].load(in);
+ } finally {
+ in.close();
+ }
}
} catch (Exception e) {
- LOG.error("Load Distributed Cache Error", e);
throw new HiveException(e);
}
}
@@ -208,39 +174,31 @@ public class MapJoinOperator extends Abs
generateMapMetaData();
firstRow = false;
}
-
loadHashTable();
} catch (SerDeException e) {
- e.printStackTrace();
throw new HiveException(e);
}
}
@Override
public void processOp(Object row, int tag) throws HiveException {
-
try {
if (firstRow) {
// generate the map metadata
generateMapMetaData();
firstRow = false;
}
-
alias = (byte)tag;
// compute keys and values as StandardObjects
- AbstractMapJoinKey key = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
+ key = JoinUtil.computeMapJoinKeys(key, row, joinKeys[alias],
joinKeysObjectInspectors[alias]);
-
boolean joinNeeded = false;
for (byte pos = 0; pos < order.length; pos++) {
if (pos != alias) {
-
- MapJoinObjectValue o = mapJoinTables[pos].get(key);
- MapJoinRowContainer<ArrayList<Object>> rowContainer = rowContainerMap[pos];
-
+ MapJoinRowContainer rowContainer = mapJoinTables[pos].get(key);
// there is no join-value or join-key has all null elements
- if (o == null || key.hasAnyNulls(nullsafes)) {
+ if (rowContainer == null || key.hasAnyNulls(nullsafes)) {
if (!noOuterJoin) {
joinNeeded = true;
storage[pos] = dummyObjVectors[pos];
@@ -249,45 +207,36 @@ public class MapJoinOperator extends Abs
}
} else {
joinNeeded = true;
- rowContainer.reset(o.getObj());
- storage[pos] = rowContainer;
- aliasFilterTags[pos] = o.getAliasFilter();
+ storage[pos] = rowContainer.copy();
+ aliasFilterTags[pos] = rowContainer.getAliasFilter();
}
}
}
-
if (joinNeeded) {
ArrayList<Object> value = getFilteredValue(alias, row);
-
// Add the value to the ArrayList
storage[alias].add(value);
-
// generate the output records
checkAndGenObject();
}
-
// done with the row
storage[tag].clear();
-
for (byte pos = 0; pos < order.length; pos++) {
if (pos != tag) {
storage[pos] = null;
}
}
-
} catch (SerDeException e) {
- e.printStackTrace();
throw new HiveException(e);
}
}
@Override
public void closeOp(boolean abort) throws HiveException {
-
if (mapJoinTables != null) {
- for (HashMapWrapper<?, ?> hashTable : mapJoinTables) {
- if (hashTable != null) {
- hashTable.close();
+ for (MapJoinTableContainer tableContainer : mapJoinTables) {
+ if (tableContainer != null) {
+ tableContainer.clear();
}
}
}