You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/03/02 23:38:08 UTC
svn commit: r1451954 [5/27] - in /hive/branches/ptf-windowing: ./
cli/src/java/org/apache/hadoop/hive/cli/ common/src/java/conf/
common/src/java/org/apache/hadoop/hive/conf/ conf/
contrib/src/test/results/clientpositive/ data/conf/ data/files/ hbase-ha...
Modified: hive/branches/ptf-windowing/metastore/src/test/org/apache/hadoop/hive/metastore/TestPartitionNameWhitelistPreEventHook.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/metastore/src/test/org/apache/hadoop/hive/metastore/TestPartitionNameWhitelistPreEventHook.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/metastore/src/test/org/apache/hadoop/hive/metastore/TestPartitionNameWhitelistPreEventHook.java (original)
+++ hive/branches/ptf-windowing/metastore/src/test/org/apache/hadoop/hive/metastore/TestPartitionNameWhitelistPreEventHook.java Sat Mar 2 22:37:59 2013
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.metastore;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import junit.framework.Assert;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.events.PreAddPartitionEvent;
-import org.apache.hadoop.hive.metastore.events.PreEventContext;
-import org.junit.Test;
-
-// Validate CharacterWhitelistPreEventHook to ensure it refuses to process
-// a partition add or append request if partition fields contain
-// Unicode characters or commas
-
-public class TestPartitionNameWhitelistPreEventHook {
-
- // Runs an instance of DisallowUnicodePreEventListener
- // Returns whether or not it succeeded
- private boolean runHook(PreEventContext event) {
-
- Configuration config = new Configuration();
-
- // match the printable ASCII characters except for commas
- config.set(HiveConf.ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.varname
- , "[\\x20-\\x7E&&[^,]]*");
-
- PartitionNameWhitelistPreEventListener hook =
- new PartitionNameWhitelistPreEventListener(config);
-
- try {
- hook.onEvent(event);
- } catch (Exception e) {
- return false;
- }
-
- return true;
- }
-
- // Sample data
- private List<String> getPartValsWithUnicode() {
-
- List<String> partVals = new ArrayList<String>();
- partVals.add("klâwen");
- partVals.add("tägelîch");
-
- return partVals;
-
- }
-
- private List<String> getPartValsWithCommas() {
-
- List<String> partVals = new ArrayList<String>();
- partVals.add("a,b");
- partVals.add("c,d,e,f");
-
- return partVals;
-
- }
-
- private List<String> getPartValsWithValidCharacters() {
-
- List<String> partVals = new ArrayList<String>();
- partVals.add("part1");
- partVals.add("part2");
-
- return partVals;
-
- }
-
- @Test
- public void testAddPartitionWithCommas() {
-
- Partition partition = new Partition();
- partition.setValues(getPartValsWithCommas());
-
- PreAddPartitionEvent event = new PreAddPartitionEvent(partition, null);
-
- Assert.assertFalse("Add a partition with commas in name",
- runHook(event));
- }
-
- @Test
- public void testAddPartitionWithUnicode() {
-
- Partition partition = new Partition();
- partition.setValues(getPartValsWithUnicode());
-
- PreAddPartitionEvent event = new PreAddPartitionEvent(partition, null);
-
- Assert.assertFalse("Add a partition with unicode characters in name",
- runHook(event));
- }
-
- @Test
- public void testAddPartitionWithValidPartVal() {
-
- Partition p = new Partition();
- p.setValues(getPartValsWithValidCharacters());
-
- PreAddPartitionEvent event = new PreAddPartitionEvent(p, null);
-
- Assert.assertTrue("Add a partition with unicode characters in name",
- runHook(event));
- }
-
- @Test
- public void testAppendPartitionWithUnicode() {
-
- Partition p = new Partition();
- p.setValues(getPartValsWithUnicode());
-
- PreAddPartitionEvent event = new PreAddPartitionEvent(p, null);
-
- Assert.assertFalse("Append a partition with unicode characters in name",
- runHook(event));
- }
-
- @Test
- public void testAppendPartitionWithCommas() {
-
- Partition p = new Partition();
- p.setValues(getPartValsWithCommas());
-
- PreAddPartitionEvent event = new PreAddPartitionEvent(p, null);
-
- Assert.assertFalse("Append a partition with unicode characters in name",
- runHook(event));
- }
-
- @Test
- public void testAppendPartitionWithValidCharacters() {
-
- Partition p = new Partition();
- p.setValues(getPartValsWithValidCharacters());
-
- PreAddPartitionEvent event = new PreAddPartitionEvent(p, null);
-
- Assert.assertTrue("Append a partition with no unicode characters in name",
- runHook(event));
- }
-
-}
Modified: hive/branches/ptf-windowing/pdk/scripts/conf/log4j.properties
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/pdk/scripts/conf/log4j.properties?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/pdk/scripts/conf/log4j.properties (original)
+++ hive/branches/ptf-windowing/pdk/scripts/conf/log4j.properties Sat Mar 2 22:37:59 2013
@@ -66,7 +66,7 @@ log4j.appender.console.layout.Conversion
# Event Counter Appender
# Sends counts of logging messages at different severity levels to Hadoop Metrics.
#
-log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
+log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter
log4j.category.DataNucleus=ERROR,DRFA
Modified: hive/branches/ptf-windowing/ql/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/build.xml?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/build.xml (original)
+++ hive/branches/ptf-windowing/ql/build.xml Sat Mar 2 22:37:59 2013
@@ -142,7 +142,8 @@
<java classname="org.antlr.Tool" classpathref="classpath" fork="true">
<arg value="-fo" />
<arg value="${build.dir}/gen/antlr/gen-java/org/apache/hadoop/hive/ql/parse" />
- <arg value="${src.dir}/org/apache/hadoop/hive/ql/parse/Hive.g" />
+ <arg value="${src.dir}/org/apache/hadoop/hive/ql/parse/HiveLexer.g" />
+ <arg value="${src.dir}/org/apache/hadoop/hive/ql/parse/HiveParser.g" />
</java>
</target>
Modified: hive/branches/ptf-windowing/ql/src/java/conf/hive-exec-log4j.properties
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/conf/hive-exec-log4j.properties?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/conf/hive-exec-log4j.properties (original)
+++ hive/branches/ptf-windowing/ql/src/java/conf/hive-exec-log4j.properties Sat Mar 2 22:37:59 2013
@@ -57,7 +57,7 @@ log4j.appender.console.layout.Conversion
# Event Counter Appender
# Sends counts of logging messages at different severity levels to Hadoop Metrics.
#
-log4j.appender.EventCounter=org.apache.hadoop.metrics.jvm.EventCounter
+log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter
log4j.category.DataNucleus=ERROR,FA
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Sat Mar 2 22:37:59 2013
@@ -975,7 +975,8 @@ public class Driver implements CommandPr
boolean valid = true;
if ((!conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES))
&& ((conf.getBoolVar(HiveConf.ConfVars.HADOOPMAPREDINPUTDIRRECURSIVE)) || (conf
- .getBoolVar(HiveConf.ConfVars.HIVEOPTLISTBUCKETING)))) {
+ .getBoolVar(HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) || ((conf
+ .getBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE))))) {
errorMessage = "FAILED: Hive Internal Error: "
+ ErrorMsg.SUPPORT_DIR_MUST_TRUE_FOR_LIST_BUCKETING.getMsg();
SQLState = ErrorMsg.findSQLState(errorMessage);
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Sat Mar 2 22:37:59 2013
@@ -256,7 +256,8 @@ public enum ErrorMsg {
10199,
"hive.mapred.supports.subdirectories must be true"
+ " if any one of following is true: "
- + " hive.optimize.listbucketing and mapred.input.dir.recursive"),
+ + " hive.optimize.listbucketing , mapred.input.dir.recursive"
+ + " and hive.optimize.union.remove."),
SKEWED_TABLE_NO_COLUMN_NAME(10200, "No skewed column name."),
SKEWED_TABLE_NO_COLUMN_VALUE(10201, "No skewed values."),
SKEWED_TABLE_DUPLICATE_COLUMN_NAMES(10202,
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Sat Mar 2 22:37:59 2013
@@ -20,9 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -44,15 +42,15 @@ public abstract class AbstractMapJoinOpe
/**
* The expressions for join inputs's join keys.
*/
- protected transient Map<Byte, List<ExprNodeEvaluator>> joinKeys;
+ protected transient List<ExprNodeEvaluator>[] joinKeys;
/**
* The ObjectInspectors for the join inputs's join keys.
*/
- protected transient Map<Byte, List<ObjectInspector>> joinKeysObjectInspectors;
+ protected transient List<ObjectInspector>[] joinKeysObjectInspectors;
/**
* The standard ObjectInspectors for the join inputs's join keys.
*/
- protected transient Map<Byte, List<ObjectInspector>> joinKeysStandardObjectInspectors;
+ protected transient List<ObjectInspector>[] joinKeysStandardObjectInspectors;
protected transient byte posBigTable = -1; // one of the tables that is not in memory
transient int mapJoinRowsKey; // rows for a given key
@@ -78,19 +76,22 @@ public abstract class AbstractMapJoinOpe
}
@Override
+ @SuppressWarnings("unchecked")
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
numMapRowsRead = 0;
firstRow = true;
- joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
+ int tagLen = conf.getTagLength();
+
+ joinKeys = new List[tagLen];
JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), NOTSKIPBIGTABLE);
joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
- inputObjInspectors,NOTSKIPBIGTABLE);
+ inputObjInspectors,NOTSKIPBIGTABLE, tagLen);
joinKeysStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
- joinKeysObjectInspectors,NOTSKIPBIGTABLE);
+ joinKeysObjectInspectors,NOTSKIPBIGTABLE, tagLen);
// all other tables are small, and are cached in the hash table
posBigTable = (byte) conf.getPosBigTable();
@@ -98,10 +99,10 @@ public abstract class AbstractMapJoinOpe
emptyList = new RowContainer<ArrayList<Object>>(1, hconf, reporter);
RowContainer bigPosRC = JoinUtil.getRowContainer(hconf,
- rowContainerStandardObjectInspectors.get(posBigTable),
+ rowContainerStandardObjectInspectors[posBigTable],
posBigTable, joinCacheSize,spillTableDesc, conf,
!hasFilter(posBigTable), reporter);
- storage.put(posBigTable, bigPosRC);
+ storage[posBigTable] = bigPosRC;
mapJoinRowsKey = HiveConf.getIntVar(hconf,
HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Sat Mar 2 22:37:59 2013
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -90,34 +90,32 @@ public abstract class CommonJoinOperator
/**
* The expressions for join inputs.
*/
- protected transient Map<Byte, List<ExprNodeEvaluator>> joinValues;
+ protected transient List<ExprNodeEvaluator>[] joinValues;
/**
* The filters for join
*/
- protected transient Map<Byte, List<ExprNodeEvaluator>> joinFilters;
+ protected transient List<ExprNodeEvaluator>[] joinFilters;
protected transient int[][] filterMap;
/**
* The ObjectInspectors for the join inputs.
*/
- protected transient Map<Byte, List<ObjectInspector>> joinValuesObjectInspectors;
+ protected transient List<ObjectInspector>[] joinValuesObjectInspectors;
/**
* The ObjectInspectors for join filters.
*/
- protected transient
- Map<Byte, List<ObjectInspector>> joinFilterObjectInspectors;
+ protected transient List<ObjectInspector>[] joinFilterObjectInspectors;
/**
* The standard ObjectInspectors for the join inputs.
*/
- protected transient Map<Byte, List<ObjectInspector>> joinValuesStandardObjectInspectors;
+ protected transient List<ObjectInspector>[] joinValuesStandardObjectInspectors;
/**
* The standard ObjectInspectors for the row container.
*/
- protected transient
- Map<Byte, List<ObjectInspector>> rowContainerStandardObjectInspectors;
+ protected transient List<ObjectInspector>[] rowContainerStandardObjectInspectors;
protected transient Byte[] order; // order in which the results should
// be output
@@ -141,12 +139,12 @@ public abstract class CommonJoinOperator
private transient Map<Integer, Set<String>> posToAliasMap;
transient LazyBinarySerDe[] spillTableSerDe;
- protected transient Map<Byte, TableDesc> spillTableDesc; // spill tables are
+ protected transient TableDesc[] spillTableDesc; // spill tables are
// used if the join
// input is too large
// to fit in memory
- HashMap<Byte, AbstractRowContainer<ArrayList<Object>>> storage; // map b/w table alias
+ AbstractRowContainer<ArrayList<Object>>[] storage; // map b/w table alias
// to RowContainer
int joinEmitInterval = -1;
int joinCacheSize = 0;
@@ -206,12 +204,14 @@ public abstract class CommonJoinOperator
protected static <T extends JoinDesc> ObjectInspector getJoinOutputObjectInspector(
- Byte[] order, Map<Byte, List<ObjectInspector>> aliasToObjectInspectors,
+ Byte[] order, List<ObjectInspector>[] aliasToObjectInspectors,
T conf) {
- ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
+ List<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
for (Byte alias : order) {
- List<ObjectInspector> oiList = aliasToObjectInspectors.get(alias);
- structFieldObjectInspectors.addAll(oiList);
+ List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
+ if (oiList != null) {
+ structFieldObjectInspectors.addAll(oiList);
+ }
}
StructObjectInspector joinOutputObjectInspector = ObjectInspectorFactory
@@ -223,6 +223,7 @@ public abstract class CommonJoinOperator
Configuration hconf;
@Override
+ @SuppressWarnings("unchecked")
protected void initializeOp(Configuration hconf) throws HiveException {
this.handleSkewJoin = conf.getHandleSkewJoin();
this.hconf = hconf;
@@ -232,14 +233,16 @@ public abstract class CommonJoinOperator
countAfterReport = 0;
totalSz = 0;
+
+ int tagLen = conf.getTagLength();
// Map that contains the rows for each alias
- storage = new HashMap<Byte, AbstractRowContainer<ArrayList<Object>>>();
+ storage = new AbstractRowContainer[tagLen];
numAliases = conf.getExprs().size();
- joinValues = new HashMap<Byte, List<ExprNodeEvaluator>>();
+ joinValues = new List[tagLen];
- joinFilters = new HashMap<Byte, List<ExprNodeEvaluator>>();
+ joinFilters = new List[tagLen];
order = conf.getTagOrder();
condn = conf.getConds();
@@ -250,34 +253,33 @@ public abstract class CommonJoinOperator
order,NOTSKIPBIGTABLE);
//process join filters
- joinFilters = new HashMap<Byte, List<ExprNodeEvaluator>>();
+ joinFilters = new List[tagLen];
JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(),order,NOTSKIPBIGTABLE);
joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues,
- inputObjInspectors,NOTSKIPBIGTABLE);
+ inputObjInspectors,NOTSKIPBIGTABLE, tagLen);
joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters,
- inputObjInspectors,NOTSKIPBIGTABLE);
+ inputObjInspectors,NOTSKIPBIGTABLE, tagLen);
joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
- joinValuesObjectInspectors,NOTSKIPBIGTABLE);
+ joinValuesObjectInspectors,NOTSKIPBIGTABLE, tagLen);
filterMap = conf.getFilterMap();
if (noOuterJoin) {
rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors;
} else {
- Map<Byte, List<ObjectInspector>> rowContainerObjectInspectors =
- new HashMap<Byte, List<ObjectInspector>>();
+ List<ObjectInspector>[] rowContainerObjectInspectors = new List[tagLen];
for (Byte alias : order) {
ArrayList<ObjectInspector> rcOIs = new ArrayList<ObjectInspector>();
- rcOIs.addAll(joinValuesObjectInspectors.get(alias));
+ rcOIs.addAll(joinValuesObjectInspectors[alias]);
// for each alias, add object inspector for boolean as the last element
rcOIs.add(
PrimitiveObjectInspectorFactory.writableByteObjectInspector);
- rowContainerObjectInspectors.put(alias, rcOIs);
+ rowContainerObjectInspectors[alias] = rcOIs;
}
rowContainerStandardObjectInspectors =
- JoinUtil.getStandardObjectInspectors(rowContainerObjectInspectors,NOTSKIPBIGTABLE);
+ JoinUtil.getStandardObjectInspectors(rowContainerObjectInspectors,NOTSKIPBIGTABLE, tagLen);
}
@@ -312,7 +314,7 @@ public abstract class CommonJoinOperator
dummyObj[pos] = nr;
// there should be only 1 dummy object in the RowContainer
RowContainer<ArrayList<Object>> values = JoinUtil.getRowContainer(hconf,
- rowContainerStandardObjectInspectors.get((byte)pos),
+ rowContainerStandardObjectInspectors[pos],
alias, 1, spillTableDesc, conf, !hasFilter(pos), reporter);
values.add((ArrayList<Object>) dummyObj[pos]);
@@ -321,9 +323,9 @@ public abstract class CommonJoinOperator
// if serde is null, the input doesn't need to be spilled out
// e.g., the output columns does not contains the input table
RowContainer rc = JoinUtil.getRowContainer(hconf,
- rowContainerStandardObjectInspectors.get((byte)pos),
+ rowContainerStandardObjectInspectors[pos],
alias, joinCacheSize,spillTableDesc, conf, !hasFilter(pos), reporter);
- storage.put(pos, rc);
+ storage[pos] = rc;
pos++;
}
@@ -340,7 +342,7 @@ public abstract class CommonJoinOperator
}
LOG.info("JOIN "
- + ((StructObjectInspector) outputObjInspector).getTypeName()
+ + outputObjInspector.getTypeName()
+ " totalsz = " + totalSz);
}
@@ -353,7 +355,7 @@ transient boolean newGroupStarted = fals
public void startGroup() throws HiveException {
LOG.trace("Join: Starting new group");
newGroupStarted = true;
- for (AbstractRowContainer<ArrayList<Object>> alw : storage.values()) {
+ for (AbstractRowContainer<ArrayList<Object>> alw : storage) {
alw.clear();
}
}
@@ -376,7 +378,7 @@ transient boolean newGroupStarted = fals
int p = 0;
for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
- int sz = joinValues.get(alias).size();
+ int sz = joinValues[alias].size();
if (nullsArr[i]) {
for (int j = 0; j < sz; j++) {
forwardCache[p++] = null;
@@ -675,7 +677,7 @@ transient boolean newGroupStarted = fals
if (aliasNum < numAliases) {
// search for match in the rhs table
- AbstractRowContainer<ArrayList<Object>> aliasRes = storage.get(order[aliasNum]);
+ AbstractRowContainer<ArrayList<Object>> aliasRes = storage[order[aliasNum]];
for (ArrayList<Object> newObj = aliasRes.first(); newObj != null; newObj = aliasRes
.next()) {
@@ -731,9 +733,9 @@ transient boolean newGroupStarted = fals
private void genUniqueJoinObject(int aliasNum, int forwardCachePos)
throws HiveException {
- AbstractRowContainer<ArrayList<Object>> alias = storage.get(order[aliasNum]);
+ AbstractRowContainer<ArrayList<Object>> alias = storage[order[aliasNum]];
for (ArrayList<Object> row = alias.first(); row != null; row = alias.next()) {
- int sz = joinValues.get(order[aliasNum]).size();
+ int sz = joinValues[order[aliasNum]].size();
int p = forwardCachePos;
for (int j = 0; j < sz; j++) {
forwardCache[p++] = row.get(j);
@@ -751,8 +753,8 @@ transient boolean newGroupStarted = fals
throws HiveException {
int p = 0;
for (int i = 0; i < numAliases; i++) {
- int sz = joinValues.get(order[i]).size();
- ArrayList<Object> obj = storage.get(order[i]).first();
+ int sz = joinValues[order[i]].size();
+ ArrayList<Object> obj = storage[order[i]].first();
for (int j = 0; j < sz; j++) {
forwardCache[p++] = obj.get(j);
}
@@ -774,7 +776,7 @@ transient boolean newGroupStarted = fals
boolean allOne = true;
for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
- AbstractRowContainer<ArrayList<Object>> alw = storage.get(alias);
+ AbstractRowContainer<ArrayList<Object>> alw = storage[alias];
if (alw.size() != 1) {
allOne = false;
@@ -807,7 +809,7 @@ transient boolean newGroupStarted = fals
boolean hasEmpty = false;
for (int i = 0; i < numAliases; i++) {
Byte alias = order[i];
- AbstractRowContainer<ArrayList<Object>> alw = storage.get(alias);
+ AbstractRowContainer<ArrayList<Object>> alw = storage[alias];
if (noOuterJoin) {
if (alw.size() == 0) {
@@ -858,7 +860,7 @@ transient boolean newGroupStarted = fals
// returns filter result of left object by filters associated with right alias
private boolean isLeftFiltered(int left, int right, List<Object> leftObj) {
- if (joinValues.get(order[left]).size() < leftObj.size()) {
+ if (joinValues[order[left]].size() < leftObj.size()) {
ByteWritable filter = (ByteWritable) leftObj.get(leftObj.size() - 1);
return JoinUtil.isFiltered(filter.get(), right);
}
@@ -867,7 +869,7 @@ transient boolean newGroupStarted = fals
// returns filter result of right object by filters associated with left alias
private boolean isRightFiltered(int left, int right, List<Object> rightObj) {
- if (joinValues.get(order[right]).size() < rightObj.size()) {
+ if (joinValues[order[right]].size() < rightObj.size()) {
ByteWritable filter = (ByteWritable) rightObj.get(rightObj.size() - 1);
return JoinUtil.isFiltered(filter.get(), left);
}
@@ -902,12 +904,12 @@ transient boolean newGroupStarted = fals
@Override
public void closeOp(boolean abort) throws HiveException {
LOG.trace("Join Op close");
- for (AbstractRowContainer<ArrayList<Object>> alw : storage.values()) {
+ for (AbstractRowContainer<ArrayList<Object>> alw : storage) {
if (alw != null) {
alw.clear(); // clean up the temp files
}
}
- storage.clear();
+ Arrays.fill(storage, null);
}
@Override
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Sat Mar 2 22:37:59 2013
@@ -113,6 +113,7 @@ import org.apache.hadoop.hive.ql.plan.Al
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
import org.apache.hadoop.hive.ql.plan.AlterTableSimpleDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableAlterPartDesc;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.CreateIndexDesc;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
@@ -418,6 +419,11 @@ public class DDLTask extends Task<DDLWor
return mergeFiles(db, mergeFilesDesc);
}
+ AlterTableAlterPartDesc alterPartDesc = work.getAlterTableAlterPartDesc();
+ if(alterPartDesc != null) {
+ return alterTableAlterPart(db, alterPartDesc);
+ }
+
TruncateTableDesc truncateTableDesc = work.getTruncateTblDesc();
if (truncateTableDesc != null) {
return truncateTable(db, truncateTableDesc);
@@ -1070,6 +1076,49 @@ public class DDLTask extends Task<DDLWor
}
/**
+ * Alter partition column type in a table
+ *
+ * @param db
+ * Database to rename the partition.
+ * @param alterPartitionDesc
+ * change partition column type.
+ * @return Returns 0 when execution succeeds and above 0 if it fails.
+ * @throws HiveException
+ */
+ private int alterTableAlterPart(Hive db, AlterTableAlterPartDesc alterPartitionDesc)
+ throws HiveException {
+
+ Table tbl = db.getTable(alterPartitionDesc.getDbName(), alterPartitionDesc.getTableName());
+ String tabName = alterPartitionDesc.getTableName();
+
+ // This is checked by DDLSemanticAnalyzer
+ assert(tbl.isPartitioned());
+
+ List<FieldSchema> newPartitionKeys = new ArrayList<FieldSchema>();
+
+ for(FieldSchema col : tbl.getTTable().getPartitionKeys()) {
+ if (col.getName().compareTo(alterPartitionDesc.getPartKeySpec().getName()) == 0) {
+ newPartitionKeys.add(alterPartitionDesc.getPartKeySpec());
+ } else {
+ newPartitionKeys.add(col);
+ }
+ }
+
+ tbl.getTTable().setPartitionKeys(newPartitionKeys);
+
+ try {
+ db.alterTable(tabName, tbl);
+ } catch (InvalidOperationException e) {
+ throw new HiveException("Uable to update table");
+ }
+
+ work.getInputs().add(new ReadEntity(tbl));
+ work.getOutputs().add(new WriteEntity(tbl));
+
+ return 0;
+ }
+
+ /**
* Rewrite the partition's metadata and force the pre/post execute hooks to
* be fired.
*
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Sat Mar 2 22:37:59 2013
@@ -94,6 +94,8 @@ public class ExecMapper extends MapReduc
localWork = mrwork.getMapLocalWork();
execContext.setLocalWork(localWork);
+ MapredContext.init(true, new JobConf(jc));
+
mo.setExecContext(execContext);
mo.initializeLocalWork(jc);
mo.initialize(jc, null);
@@ -131,6 +133,7 @@ public class ExecMapper extends MapReduc
mo.setOutputCollector(oc);
mo.setReporter(rp);
mo.setOperatorHooks(opHooks);
+ MapredContext.get().setReporter(reporter);
}
// reset the execContext for each new row
execContext.resetRow();
@@ -226,6 +229,8 @@ public class ExecMapper extends MapReduc
l4j.error("Hit error while closing operators - failing tree");
throw new RuntimeException("Hive Runtime Error while closing operators", e);
}
+ } finally {
+ MapredContext.close();
}
}
@@ -257,8 +262,8 @@ public class ExecMapper extends MapReduc
}
public void func(Operator op) {
- Map<Enum, Long> opStats = op.getStats();
- for (Map.Entry<Enum, Long> e : opStats.entrySet()) {
+ Map<Enum<?>, Long> opStats = op.getStats();
+ for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {
if (rp != null) {
rp.incrCounter(e.getKey(), e.getValue());
}
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Sat Mar 2 22:37:59 2013
@@ -146,6 +146,8 @@ public class ExecReducer extends MapRedu
throw new RuntimeException(e);
}
+ MapredContext.init(false, new JobConf(jc));
+
// initialize reduce operator tree
try {
l4j.info(reducer.dump(0));
@@ -182,6 +184,7 @@ public class ExecReducer extends MapRedu
reducer.setOutputCollector(oc);
reducer.setReporter(rp);
reducer.setOperatorHooks(opHooks);
+ MapredContext.get().setReporter(reporter);
}
try {
@@ -317,6 +320,8 @@ public class ExecReducer extends MapRedu
throw new RuntimeException("Hive Runtime Error while closing operators: "
+ e.getMessage(), e);
}
+ } finally {
+ MapredContext.close();
}
}
}
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeGenericFuncEvaluator.java Sat Mar 2 22:37:59 2013
@@ -134,6 +134,10 @@ public class ExprNodeGenericFuncEvaluato
throw new HiveException(
"Stateful expressions cannot be used inside of CASE");
}
+ MapredContext context = MapredContext.get();
+ if (context != null) {
+ context.setup(genericUDF);
+ }
this.outputOI = genericUDF.initializeAndFoldConstants(childrenOIs);
return this.outputOI;
}
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sat Mar 2 22:37:59 2013
@@ -34,7 +34,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -323,8 +322,7 @@ public class FileSinkOperator extends Te
hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
isCompressed = conf.getCompressed();
parent = Utilities.toTempPath(conf.getDirName());
- statsCollectRawDataSize =
- HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_STATS_COLLECT_RAWDATASIZE);
+ statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
serializer.initialize(null, conf.getTableInfo().getProperties());
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Sat Mar 2 22:37:59 2013
@@ -172,6 +172,11 @@ public class FilterOperator extends Oper
}
@Override
+ public boolean supportAutomaticSortMergeJoin() {
+ return true;
+ }
+
+ @Override
public boolean supportUnionRemoveOptimization() {
return true;
}
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Sat Mar 2 22:37:59 2013
@@ -204,6 +204,7 @@ import org.apache.hadoop.hive.ql.udf.gen
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFPrintf;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFReflect;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFReflect2;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFSentences;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFSize;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFSortArray;
@@ -477,6 +478,7 @@ public final class FunctionRegistry {
// Generic UDFs
registerGenericUDF("reflect", GenericUDFReflect.class);
+ registerGenericUDF("reflect2", GenericUDFReflect2.class);
registerGenericUDF("java_method", GenericUDFReflect.class);
registerGenericUDF("array", GenericUDFArray.class);
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Sat Mar 2 22:37:59 2013
@@ -363,6 +363,12 @@ public class GroupByOperator extends Ope
for (ExprNodeEvaluator keyField : keyFields) {
objectInspectors.add(null);
}
+ MapredContext context = MapredContext.get();
+ if (context != null) {
+ for (GenericUDAFEvaluator genericUDAFEvaluator : aggregationEvaluators) {
+ context.setup(genericUDAFEvaluator);
+ }
+ }
for (int i = 0; i < aggregationEvaluators.length; i++) {
ObjectInspector roi = aggregationEvaluators[i].init(conf.getAggregators()
.get(i).getMode(), aggregationParameterObjectInspectors[i]);
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Sat Mar 2 22:37:59 2013
@@ -20,10 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,15 +58,15 @@ public class HashTableSinkOperator exten
/**
* The expressions for join inputs's join keys.
*/
- protected transient Map<Byte, List<ExprNodeEvaluator>> joinKeys;
+ protected transient List<ExprNodeEvaluator>[] joinKeys;
/**
* The ObjectInspectors for the join inputs's join keys.
*/
- protected transient Map<Byte, List<ObjectInspector>> joinKeysObjectInspectors;
+ protected transient List<ObjectInspector>[] joinKeysObjectInspectors;
/**
* The standard ObjectInspectors for the join inputs's join keys.
*/
- protected transient Map<Byte, List<ObjectInspector>> joinKeysStandardObjectInspectors;
+ protected transient List<ObjectInspector>[] joinKeysStandardObjectInspectors;
protected transient int posBigTableAlias = -1; // one of the tables that is not in memory
transient int mapJoinRowsKey; // rows for a given key
@@ -82,7 +79,7 @@ public class HashTableSinkOperator exten
/**
* The filters for join
*/
- protected transient Map<Byte, List<ExprNodeEvaluator>> joinFilters;
+ protected transient List<ExprNodeEvaluator>[] joinFilters;
protected transient int[][] filterMap;
@@ -90,28 +87,28 @@ public class HashTableSinkOperator exten
/**
* The expressions for join outputs.
*/
- protected transient Map<Byte, List<ExprNodeEvaluator>> joinValues;
+ protected transient List<ExprNodeEvaluator>[] joinValues;
/**
* The ObjectInspectors for the join inputs.
*/
- protected transient Map<Byte, List<ObjectInspector>> joinValuesObjectInspectors;
+ protected transient List<ObjectInspector>[] joinValuesObjectInspectors;
/**
* The ObjectInspectors for join filters.
*/
- protected transient Map<Byte, List<ObjectInspector>> joinFilterObjectInspectors;
+ protected transient List<ObjectInspector>[] joinFilterObjectInspectors;
/**
* The standard ObjectInspectors for the join inputs.
*/
- protected transient Map<Byte, List<ObjectInspector>> joinValuesStandardObjectInspectors;
+ protected transient List<ObjectInspector>[] joinValuesStandardObjectInspectors;
- protected transient Map<Byte, List<ObjectInspector>> rowContainerStandardObjectInspectors;
+ protected transient List<ObjectInspector>[] rowContainerStandardObjectInspectors;
protected transient Byte[] order; // order in which the results should
Configuration hconf;
protected transient Byte alias;
- protected transient Map<Byte, TableDesc> spillTableDesc; // spill tables are
+ protected transient TableDesc[] spillTableDesc; // spill tables are
- protected transient Map<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>> mapJoinTables;
+ protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
protected transient boolean noOuterJoin;
private long rowNumber = 0;
@@ -178,6 +175,7 @@ public class HashTableSinkOperator exten
@Override
+ @SuppressWarnings("unchecked")
protected void initializeOp(Configuration hconf) throws HiveException {
boolean isSilent = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESESSIONSILENT);
console = new LogHelper(LOG, isSilent);
@@ -197,52 +195,55 @@ public class HashTableSinkOperator exten
noOuterJoin = conf.isNoOuterJoin();
filterMap = conf.getFilterMap();
+ int tagLen = conf.getTagLength();
+
// process join keys
- joinKeys = new HashMap<Byte, List<ExprNodeEvaluator>>();
+ joinKeys = new List[tagLen];
JoinUtil.populateJoinKeyValue(joinKeys, conf.getKeys(), posBigTableAlias);
joinKeysObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinKeys,
- inputObjInspectors, posBigTableAlias);
+ inputObjInspectors, posBigTableAlias, tagLen);
joinKeysStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
- joinKeysObjectInspectors, posBigTableAlias);
+ joinKeysObjectInspectors, posBigTableAlias, tagLen);
// process join values
- joinValues = new HashMap<Byte, List<ExprNodeEvaluator>>();
+ joinValues = new List[tagLen];
JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), posBigTableAlias);
joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues,
- inputObjInspectors, posBigTableAlias);
+ inputObjInspectors, posBigTableAlias, tagLen);
joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors(
- joinValuesObjectInspectors, posBigTableAlias);
+ joinValuesObjectInspectors, posBigTableAlias, tagLen);
// process join filters
- joinFilters = new HashMap<Byte, List<ExprNodeEvaluator>>();
+ joinFilters = new List[tagLen];
JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(), posBigTableAlias);
joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters,
- inputObjInspectors, posBigTableAlias);
+ inputObjInspectors, posBigTableAlias, tagLen);
if (noOuterJoin) {
rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors;
} else {
- Map<Byte, List<ObjectInspector>> rowContainerObjectInspectors = new HashMap<Byte, List<ObjectInspector>>();
+ List<ObjectInspector>[] rowContainerObjectInspectors = new List[tagLen];
for (Byte alias : order) {
if (alias == posBigTableAlias) {
continue;
}
- List<ObjectInspector> rcOIs = joinValuesObjectInspectors.get(alias);
+ List<ObjectInspector> rcOIs = joinValuesObjectInspectors[alias];
if (filterMap != null && filterMap[alias] != null) {
// for each alias, add object inspector for filter tag as the last element
rcOIs = new ArrayList<ObjectInspector>(rcOIs);
rcOIs.add(PrimitiveObjectInspectorFactory.writableByteObjectInspector);
}
- rowContainerObjectInspectors.put(alias, rcOIs);
+ rowContainerObjectInspectors[alias] = rcOIs;
}
- rowContainerStandardObjectInspectors = getStandardObjectInspectors(rowContainerObjectInspectors);
+ rowContainerStandardObjectInspectors = getStandardObjectInspectors(
+ rowContainerObjectInspectors, tagLen);
}
metadataValueTag = new int[numAliases];
for (int pos = 0; pos < numAliases; pos++) {
metadataValueTag[pos] = -1;
}
- mapJoinTables = new HashMap<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>>();
+ mapJoinTables = new HashMapWrapper[tagLen];
int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
@@ -263,24 +264,26 @@ public class HashTableSinkOperator exten
HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>(
hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage);
- mapJoinTables.put(pos, hashTable);
+ mapJoinTables[pos] = hashTable;
}
}
- protected static HashMap<Byte, List<ObjectInspector>> getStandardObjectInspectors(
- Map<Byte, List<ObjectInspector>> aliasToObjectInspectors) {
- HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
- for (Entry<Byte, List<ObjectInspector>> oiEntry : aliasToObjectInspectors.entrySet()) {
- Byte alias = oiEntry.getKey();
- List<ObjectInspector> oiList = oiEntry.getValue();
+ protected static List<ObjectInspector>[] getStandardObjectInspectors(
+ List<ObjectInspector>[] aliasToObjectInspectors, int maxTag) {
+ List<ObjectInspector>[] result = new List[maxTag];
+ for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
+ List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
+ if (oiList == null) {
+ continue;
+ }
ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(oiList.size());
for (int i = 0; i < oiList.size(); i++) {
fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i),
ObjectInspectorCopyOption.WRITABLE));
}
- result.put(alias, fieldOIList);
+ result[alias] = fieldOIList;
}
return result;
@@ -313,15 +316,15 @@ public class HashTableSinkOperator exten
alias = (byte)tag;
// compute keys and values as StandardObjects
- AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys.get(alias),
- joinKeysObjectInspectors.get(alias));
+ AbstractMapJoinKey keyMap = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
+ joinKeysObjectInspectors[alias]);
- Object[] value = JoinUtil.computeMapJoinValues(row, joinValues.get(alias),
- joinValuesObjectInspectors.get(alias), joinFilters.get(alias), joinFilterObjectInspectors
- .get(alias), filterMap == null ? null : filterMap[alias]);
+ Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
+ joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors
+ [alias], filterMap == null ? null : filterMap[alias]);
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables.get(alias);
+ HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables[alias];
MapJoinObjectValue o = hashTable.get(keyMap);
MapJoinRowContainer<Object[]> res = null;
@@ -369,7 +372,7 @@ public class HashTableSinkOperator exten
valueSerDe.initialize(null, valueTableDesc.getProperties());
- List<ObjectInspector> newFields = rowContainerStandardObjectInspectors.get((Byte) alias);
+ List<ObjectInspector> newFields = rowContainerStandardObjectInspectors[alias];
int length = newFields.size();
List<String> newNames = new ArrayList<String>(length);
for (int i = 0; i < length; i++) {
@@ -391,11 +394,12 @@ public class HashTableSinkOperator exten
String tmpURI = this.getExecContext().getLocalWork().getTmpFileURI();
LOG.info("Get TMP URI: " + tmpURI);
long fileLength;
- for (Map.Entry<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>> hashTables : mapJoinTables
- .entrySet()) {
+ for (byte tag = 0; tag < mapJoinTables.length; tag++) {
// get the key and value
- Byte tag = hashTables.getKey();
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = hashTables.getValue();
+ HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = mapJoinTables[tag];
+ if (hashTable == null) {
+ continue;
+ }
// get current input file name
String bigBucketFileName = getExecContext().getCurrentBigBucketFile();
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Sat Mar 2 22:37:59 2013
@@ -81,9 +81,9 @@ public class JoinOperator extends Common
}
- ArrayList<Object> nr = JoinUtil.computeValues(row, joinValues.get(alias),
- joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
- joinFilterObjectInspectors.get(alias),
+ ArrayList<Object> nr = JoinUtil.computeValues(row, joinValues[alias],
+ joinValuesObjectInspectors[alias], joinFilters[alias],
+ joinFilterObjectInspectors[alias],
filterMap == null ? null : filterMap[alias]);
@@ -92,7 +92,7 @@ public class JoinOperator extends Common
}
// number of rows for the key in the given table
- int sz = storage.get(alias).size();
+ int sz = storage[alias].size();
StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
.toString());
@@ -107,7 +107,7 @@ public class JoinOperator extends Common
// storage,
// to preserve the correctness for outer joins.
checkAndGenObject();
- storage.get(alias).clear();
+ storage[alias].clear();
}
} else {
if (sz == nextSz) {
@@ -128,7 +128,7 @@ public class JoinOperator extends Common
endGroup();
startGroup();
}
- storage.get(alias).add(nr);
+ storage[alias].add(nr);
} catch (Exception e) {
e.printStackTrace();
throw new HiveException(e);
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java Sat Mar 2 22:37:59 2013
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -52,64 +50,59 @@ import org.apache.hadoop.util.Reflection
public class JoinUtil {
- public static HashMap<Byte, List<ObjectInspector>> getObjectInspectorsFromEvaluators(
- Map<Byte, List<ExprNodeEvaluator>> exprEntries,
+ public static List<ObjectInspector>[] getObjectInspectorsFromEvaluators(
+ List<ExprNodeEvaluator>[] exprEntries,
ObjectInspector[] inputObjInspector,
- int posBigTableAlias) throws HiveException {
- HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
- for (Entry<Byte, List<ExprNodeEvaluator>> exprEntry : exprEntries
- .entrySet()) {
- Byte alias = exprEntry.getKey();
+ int posBigTableAlias, int tagLen) throws HiveException {
+ List<ObjectInspector>[] result = new List[tagLen];
+ for (byte alias = 0; alias < exprEntries.length; alias++) {
//get big table
- if(alias == (byte) posBigTableAlias){
+ if (alias == (byte) posBigTableAlias){
//skip the big tables
continue;
}
- List<ExprNodeEvaluator> exprList = exprEntry.getValue();
- ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>();
+ List<ExprNodeEvaluator> exprList = exprEntries[alias];
+ List<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>();
for (int i = 0; i < exprList.size(); i++) {
fieldOIList.add(exprList.get(i).initialize(inputObjInspector[alias]));
}
- result.put(alias, fieldOIList);
+ result[alias] = fieldOIList;
}
return result;
}
- public static HashMap<Byte, List<ObjectInspector>> getStandardObjectInspectors(
- Map<Byte, List<ObjectInspector>> aliasToObjectInspectors,
- int posBigTableAlias) {
- HashMap<Byte, List<ObjectInspector>> result = new HashMap<Byte, List<ObjectInspector>>();
- for (Entry<Byte, List<ObjectInspector>> oiEntry : aliasToObjectInspectors
- .entrySet()) {
- Byte alias = oiEntry.getKey();
-
+ public static List<ObjectInspector>[] getStandardObjectInspectors(
+ List<ObjectInspector>[] aliasToObjectInspectors,
+ int posBigTableAlias, int tagLen) {
+ List<ObjectInspector>[] result = new List[tagLen];
+ for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
//get big table
if(alias == (byte) posBigTableAlias ){
//skip the big tables
continue;
}
- List<ObjectInspector> oiList = oiEntry.getValue();
+ List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(
oiList.size());
for (int i = 0; i < oiList.size(); i++) {
fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList
.get(i), ObjectInspectorCopyOption.WRITABLE));
}
- result.put(alias, fieldOIList);
+ result[alias] = fieldOIList;
}
return result;
}
- public static int populateJoinKeyValue(Map<Byte, List<ExprNodeEvaluator>> outMap,
+ public static int populateJoinKeyValue(List<ExprNodeEvaluator>[] outMap,
Map<Byte, List<ExprNodeDesc>> inputMap, int posBigTableAlias) {
return populateJoinKeyValue(outMap, inputMap, null, posBigTableAlias);
}
- public static int populateJoinKeyValue(Map<Byte, List<ExprNodeEvaluator>> outMap,
+ public static int populateJoinKeyValue(List<ExprNodeEvaluator>[] outMap,
Map<Byte, List<ExprNodeDesc>> inputMap,
Byte[] order,
int posBigTableAlias) {
@@ -124,7 +117,7 @@ public class JoinUtil {
valueFields.add(ExprNodeEvaluatorFactory.get(expr));
}
}
- outMap.put(key, valueFields);
+ outMap[key] = valueFields;
total += valueFields.size();
}
@@ -289,27 +282,16 @@ public class JoinUtil {
return tag != 0;
}
- public static TableDesc getSpillTableDesc(Byte alias,
- Map<Byte, TableDesc> spillTableDesc,JoinDesc conf,
- boolean noFilter) {
- if (spillTableDesc == null || spillTableDesc.size() == 0) {
+ public static TableDesc getSpillTableDesc(Byte alias, TableDesc[] spillTableDesc,
+ JoinDesc conf, boolean noFilter) {
+ if (spillTableDesc == null || spillTableDesc.length == 0) {
spillTableDesc = initSpillTables(conf,noFilter);
}
- return spillTableDesc.get(alias);
- }
-
- public static Map<Byte, TableDesc> getSpillTableDesc(
- Map<Byte, TableDesc> spillTableDesc,JoinDesc conf,
- boolean noFilter) {
- if (spillTableDesc == null) {
- spillTableDesc = initSpillTables(conf,noFilter);
- }
- return spillTableDesc;
+ return spillTableDesc[alias];
}
- public static SerDe getSpillSerDe(byte alias,
- Map<Byte, TableDesc> spillTableDesc,JoinDesc conf,
- boolean noFilter) {
+ public static SerDe getSpillSerDe(byte alias, TableDesc[] spillTableDesc,
+ JoinDesc conf, boolean noFilter) {
TableDesc desc = getSpillTableDesc(alias, spillTableDesc, conf, noFilter);
if (desc == null) {
return null;
@@ -325,9 +307,10 @@ public class JoinUtil {
return sd;
}
- public static Map<Byte, TableDesc> initSpillTables(JoinDesc conf, boolean noFilter) {
+ public static TableDesc[] initSpillTables(JoinDesc conf, boolean noFilter) {
+ int tagLen = conf.getTagLength();
Map<Byte, List<ExprNodeDesc>> exprs = conf.getExprs();
- Map<Byte, TableDesc> spillTableDesc = new HashMap<Byte, TableDesc>(exprs.size());
+ TableDesc[] spillTableDesc = new TableDesc[tagLen];
for (int tag = 0; tag < exprs.size(); tag++) {
List<ExprNodeDesc> valueCols = exprs.get((byte) tag);
int columnSize = valueCols.size();
@@ -362,7 +345,7 @@ public class JoinUtil {
.toString(),
org.apache.hadoop.hive.serde.serdeConstants.LIST_COLUMN_TYPES,
colTypes.toString()));
- spillTableDesc.put((byte) tag, tblDesc);
+ spillTableDesc[tag] = tblDesc;
}
return spillTableDesc;
}
@@ -370,7 +353,7 @@ public class JoinUtil {
public static RowContainer getRowContainer(Configuration hconf,
List<ObjectInspector> structFieldObjectInspectors,
- Byte alias,int containerSize, Map<Byte, TableDesc> spillTableDesc,
+ Byte alias,int containerSize, TableDesc[] spillTableDesc,
JoinDesc conf,boolean noFilter, Reporter reporter) throws HiveException {
TableDesc tblDesc = JoinUtil.getSpillTableDesc(alias,spillTableDesc,conf, noFilter);
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Sat Mar 2 22:37:59 2013
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -53,14 +51,14 @@ public class MapJoinOperator extends Abs
private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
- protected transient Map<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>> mapJoinTables;
+ protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
private static final transient String[] FATAL_ERR_MSG = {
null, // counter value 0 means no error
"Mapside join exceeds available memory. "
+ "Please try removing the mapjoin hint."};
- protected transient Map<Byte, MapJoinRowContainer<ArrayList<Object>>> rowContainerMap;
+ protected transient MapJoinRowContainer<ArrayList<Object>>[] rowContainerMap;
transient int metadataKeyTag;
transient int[] metadataValueTag;
transient boolean hashTblInitedOnce;
@@ -73,6 +71,7 @@ public class MapJoinOperator extends Abs
}
@Override
+ @SuppressWarnings("unchecked")
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
@@ -84,8 +83,10 @@ public class MapJoinOperator extends Abs
metadataKeyTag = -1;
- mapJoinTables = new HashMap<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>>();
- rowContainerMap = new HashMap<Byte, MapJoinRowContainer<ArrayList<Object>>>();
+ int tagLen = conf.getTagLength();
+
+ mapJoinTables = new HashMapWrapper[tagLen];
+ rowContainerMap = new MapJoinRowContainer[tagLen];
// initialize the hash tables for other tables
for (int pos = 0; pos < numAliases; pos++) {
if (pos == posBigTable) {
@@ -94,9 +95,9 @@ public class MapJoinOperator extends Abs
HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable = new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
- mapJoinTables.put(Byte.valueOf((byte) pos), hashTable);
+ mapJoinTables[pos] = hashTable;
MapJoinRowContainer<ArrayList<Object>> rowContainer = new MapJoinRowContainer<ArrayList<Object>>();
- rowContainerMap.put(Byte.valueOf((byte) pos), rowContainer);
+ rowContainerMap[pos] = rowContainer;
}
hashTblInitedOnce = false;
@@ -175,10 +176,11 @@ public class MapJoinOperator extends Abs
baseDir = archiveLocalLink.toUri().getPath();
}
}
- for (Map.Entry<Byte, HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>> entry : mapJoinTables
- .entrySet()) {
- Byte pos = entry.getKey();
- HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashtable = entry.getValue();
+ for (byte pos = 0; pos < mapJoinTables.length; pos++) {
+ HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashtable = mapJoinTables[pos];
+ if (hashtable == null) {
+ continue;
+ }
String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), pos, fileName);
Path path = new Path(filePath);
LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString());
@@ -224,32 +226,32 @@ public class MapJoinOperator extends Abs
}
// compute keys and values as StandardObjects
- AbstractMapJoinKey key = JoinUtil.computeMapJoinKeys(row, joinKeys.get(alias),
- joinKeysObjectInspectors.get(alias));
- ArrayList<Object> value = JoinUtil.computeValues(row, joinValues.get(alias),
- joinValuesObjectInspectors.get(alias), joinFilters.get(alias), joinFilterObjectInspectors
- .get(alias), filterMap == null ? null : filterMap[alias]);
+ AbstractMapJoinKey key = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
+ joinKeysObjectInspectors[alias]);
+ ArrayList<Object> value = JoinUtil.computeValues(row, joinValues[alias],
+ joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors
+ [alias], filterMap == null ? null : filterMap[alias]);
// Add the value to the ArrayList
- storage.get(alias).add(value);
+ storage[alias].add(value);
for (byte pos = 0; pos < order.length; pos++) {
if (pos != alias) {
- MapJoinObjectValue o = mapJoinTables.get(pos).get(key);
- MapJoinRowContainer<ArrayList<Object>> rowContainer = rowContainerMap.get(pos);
+ MapJoinObjectValue o = mapJoinTables[pos].get(key);
+ MapJoinRowContainer<ArrayList<Object>> rowContainer = rowContainerMap[pos];
// there is no join-value or join-key has all null elements
if (o == null || key.hasAnyNulls(nullsafes)) {
if (noOuterJoin) {
- storage.put(pos, emptyList);
+ storage[pos] = emptyList;
} else {
- storage.put(pos, dummyObjVectors[pos]);
+ storage[pos] = dummyObjVectors[pos];
}
} else {
rowContainer.reset(o.getObj());
- storage.put(pos, rowContainer);
+ storage[pos] = rowContainer;
}
}
}
@@ -258,11 +260,11 @@ public class MapJoinOperator extends Abs
checkAndGenObject();
// done with the row
- storage.get((byte) tag).clear();
+ storage[tag].clear();
for (byte pos = 0; pos < order.length; pos++) {
if (pos != tag) {
- storage.put(pos, null);
+ storage[pos] = null;
}
}
@@ -276,8 +278,10 @@ public class MapJoinOperator extends Abs
public void closeOp(boolean abort) throws HiveException {
if (mapJoinTables != null) {
- for (HashMapWrapper<?, ?> hashTable : mapJoinTables.values()) {
- hashTable.close();
+ for (HashMapWrapper<?, ?> hashTable : mapJoinTables) {
+ if (hashTable != null) {
+ hashTable.close();
+ }
}
}
super.closeOp(abort);
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Sat Mar 2 22:37:59 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.io.rcfi
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -393,6 +394,8 @@ public class MoveTask extends Task<MoveW
}
dc = null; // reset data container to prevent it being added again.
} else { // static partitions
+ List<String> partVals = Hive.getPvals(table.getPartCols(), tbd.getPartitionSpec());
+ db.validatePartitionNameCharacters(partVals);
db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(),
tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd));
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sat Mar 2 22:37:59 2013
@@ -138,6 +138,10 @@ public abstract class Operator<T extends
return childOperators;
}
+ public int getNumChild() {
+ return childOperators == null ? 0 : childOperators.size();
+ }
+
/**
* Implements the getChildren function for the Node Interface.
*/
@@ -164,6 +168,10 @@ public abstract class Operator<T extends
return parentOperators;
}
+ public int getNumParent() {
+ return parentOperators == null ? 0 : parentOperators.size();
+ }
+
protected T conf;
protected boolean done;
@@ -1475,6 +1483,15 @@ public abstract class Operator<T extends
this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
}
+ /**
+ * Whether this operator supports automatic sort merge join.
+ * The stack is traversed, and this method is invoked for all the operators.
+ * @return TRUE if yes, FALSE otherwise.
+ */
+ public boolean supportAutomaticSortMergeJoin() {
+ return false;
+ }
+
public boolean supportUnionRemoveOptimization() {
return false;
}
@@ -1496,4 +1513,13 @@ public abstract class Operator<T extends
public boolean opAllowedAfterMapJoin() {
return true;
}
+
+ /*
+ * If this task contains a join, it can be converted to a map-join task if this operator is
+ * present in the mapper. For eg. if a sort-merge join operator is present followed by a regular
+ * join, it cannot be converted to a auto map-join.
+ */
+ public boolean opAllowedConvertMapJoin() {
+ return true;
+ }
}
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Sat Mar 2 22:37:59 2013
@@ -69,12 +69,18 @@ public class SMBMapJoinOperator extends
RowContainer<ArrayList<Object>>[] nextGroupStorage;
RowContainer<ArrayList<Object>>[] candidateStorage;
- transient Map<Byte, String> tagToAlias;
+ transient String[] tagToAlias;
private transient boolean[] fetchDone;
private transient boolean[] foundNextKeyGroup;
transient boolean firstFetchHappened = false;
private transient boolean inputFileChanged = false;
transient boolean localWorkInited = false;
+ transient boolean initDone = false;
+
+ // This join has been converted to a SMB join by the hive optimizer. The user did not
+ // give a mapjoin hint in the query. The hive optimizer figured out that the join can be
+ // performed as a smb join, based on all the tables/partitions being joined.
+ private transient boolean convertedAutomaticallySMBJoin = false;
public SMBMapJoinOperator() {
}
@@ -85,6 +91,13 @@ public class SMBMapJoinOperator extends
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
+
+ // If there is a sort-merge join followed by a regular join, the SMBJoinOperator may not
+ // get initialized at all. Consider the following query:
+ // A SMB B JOIN C
+ // For the mapper processing C, The SMJ is not initialized, no need to close it either.
+ initDone = true;
+
super.initializeOp(hconf);
firstRow = true;
@@ -114,17 +127,17 @@ public class SMBMapJoinOperator extends
HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE);
for (byte pos = 0; pos < order.length; pos++) {
RowContainer rc = JoinUtil.getRowContainer(hconf,
- rowContainerStandardObjectInspectors.get(pos),
+ rowContainerStandardObjectInspectors[pos],
pos, bucketSize,spillTableDesc, conf, !hasFilter(pos),
reporter);
nextGroupStorage[pos] = rc;
RowContainer candidateRC = JoinUtil.getRowContainer(hconf,
- rowContainerStandardObjectInspectors.get(pos),
- pos,bucketSize,spillTableDesc, conf, !hasFilter(pos),
+ rowContainerStandardObjectInspectors[pos],
+ pos, bucketSize,spillTableDesc, conf, !hasFilter(pos),
reporter);
candidateStorage[pos] = candidateRC;
}
- tagToAlias = conf.getTagToAlias();
+ tagToAlias = conf.convertToArray(conf.getTagToAlias(), String.class);
for (byte pos = 0; pos < order.length; pos++) {
if (pos != posBigTable) {
@@ -194,9 +207,9 @@ public class SMBMapJoinOperator extends
}
private byte tagForAlias(String alias) {
- for (Map.Entry<Byte, String> entry : tagToAlias.entrySet()) {
- if (entry.getValue().equals(alias)) {
- return entry.getKey();
+ for (byte tag = 0; tag < tagToAlias.length; tag++) {
+ if (alias.equals(tagToAlias[tag])) {
+ return tag;
}
}
return -1;
@@ -241,18 +254,18 @@ public class SMBMapJoinOperator extends
byte alias = (byte) tag;
// compute keys and values as StandardObjects
- ArrayList<Object> key = JoinUtil.computeKeys(row, joinKeys.get(alias),
- joinKeysObjectInspectors.get(alias));
- ArrayList<Object> value = JoinUtil.computeValues(row, joinValues.get(alias),
- joinValuesObjectInspectors.get(alias), joinFilters.get(alias),
- joinFilterObjectInspectors.get(alias),
+ ArrayList<Object> key = JoinUtil.computeKeys(row, joinKeys[alias],
+ joinKeysObjectInspectors[alias]);
+ ArrayList<Object> value = JoinUtil.computeValues(row, joinValues[alias],
+ joinValuesObjectInspectors[alias], joinFilters[alias],
+ joinFilterObjectInspectors[alias],
filterMap == null ? null : filterMap[alias]);
//have we reached a new key group?
boolean nextKeyGroup = processKey(alias, key);
if (nextKeyGroup) {
- //assert this.nextGroupStorage.get(alias).size() == 0;
+ //assert this.nextGroupStorage[alias].size() == 0;
this.nextGroupStorage[alias].add(value);
foundNextKeyGroup[tag] = true;
if (tag != posBigTable) {
@@ -365,7 +378,7 @@ public class SMBMapJoinOperator extends
putDummyOrEmpty(index);
continue;
}
- storage.put(index, candidateStorage[index]);
+ storage[index] = candidateStorage[index];
needFetchList.add(index);
if (smallestPos[index] < 0) {
break;
@@ -451,9 +464,9 @@ public class SMBMapJoinOperator extends
private void putDummyOrEmpty(Byte i) {
// put a empty list or null
if (noOuterJoin) {
- storage.put(i, emptyList);
+ storage[i] = emptyList;
} else {
- storage.put(i, dummyObjVectors[i.intValue()]);
+ storage[i] = dummyObjVectors[i];
}
}
@@ -518,7 +531,7 @@ public class SMBMapJoinOperator extends
}
private void fetchOneRow(byte tag) {
- String table = tagToAlias.get(tag);
+ String table = tagToAlias[tag];
MergeQueue mergeQueue = aliasToMergeQueue.get(table);
// The operator tree till the sink operator has already been processed while
@@ -558,6 +571,15 @@ public class SMBMapJoinOperator extends
}
closeCalled = true;
+ // If there is a sort-merge join followed by a regular join, the SMBJoinOperator may not
+ // get initialized at all. Consider the following query:
+ // A SMB B JOIN C
+ // For the mapper processing C, The SMJ is not initialized, no need to close it either.
+ if (!initDone) {
+ return;
+ }
+
+
if (inputFileChanged || !firstFetchHappened) {
//set up the fetch operator for the new input file.
for (Map.Entry<String, MergeQueue> entry : aliasToMergeQueue.entrySet()) {
@@ -620,6 +642,14 @@ public class SMBMapJoinOperator extends
return OperatorType.MAPJOIN;
}
+ public boolean isConvertedAutomaticallySMBJoin() {
+ return convertedAutomaticallySMBJoin;
+ }
+
+ public void setConvertedAutomaticallySMBJoin(boolean convertedAutomaticallySMBJoin) {
+ this.convertedAutomaticallySMBJoin = convertedAutomaticallySMBJoin;
+ }
+
// returns rows from possibly multiple bucket files of small table in ascending order
// by utilizing primary queue (borrowed from hadoop)
// elements of queue (Integer) are index to FetchOperator[] (segments)
@@ -750,8 +780,8 @@ public class SMBMapJoinOperator extends
if (keyFields == null) {
byte tag = tagForAlias(alias);
// joinKeys/joinKeysOI are initialized after making merge queue, so setup lazily at runtime
- keyFields = joinKeys.get(tag);
- keyFieldOIs = joinKeysObjectInspectors.get(tag);
+ keyFields = joinKeys[tag];
+ keyFieldOIs = joinKeysObjectInspectors[tag];
}
InspectableObject nextRow = segments[current].getNextRow();
while (nextRow != null) {
@@ -778,4 +808,9 @@ public class SMBMapJoinOperator extends
return false;
}
}
+
+ @Override
+ public boolean opAllowedConvertMapJoin() {
+ return false;
+ }
}
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Sat Mar 2 22:37:59 2013
@@ -372,6 +372,21 @@ public class ScriptOperator extends Oper
throw new HiveException(e);
} catch (IOException e) {
if (isBrokenPipeException(e) && allowPartialConsumption()) {
+ // Give the outThread a chance to finish before marking the operator as done
+ try {
+ scriptPid.waitFor();
+ } catch (InterruptedException interruptedException) {
+ }
+ // best effort attempt to write all output from the script before marking the operator
+ // as done
+ try {
+ if (outThread != null) {
+ outThread.join(0);
+ }
+ } catch (Exception e2) {
+ LOG.warn("Exception in closing outThread: "
+ + StringUtils.stringifyException(e2));
+ }
setDone(true);
LOG
.warn("Got broken pipe during write: ignoring exception and setting operator to done");
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Sat Mar 2 22:37:59 2013
@@ -112,6 +112,11 @@ public class SelectOperator extends Oper
}
@Override
+ public boolean supportAutomaticSortMergeJoin() {
+ return true;
+ }
+
+ @Override
public boolean supportUnionRemoveOptimization() {
return true;
}
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Sat Mar 2 22:37:59 2013
@@ -155,19 +155,17 @@ public class SkewJoinHandler {
}
StructObjectInspector structTblValInpector = ObjectInspectorFactory
.getStandardStructObjectInspector(valColNames,
- joinOp.joinValuesStandardObjectInspectors.get((byte) i));
+ joinOp.joinValuesStandardObjectInspectors[i]);
StructObjectInspector structTblInpector = ObjectInspectorFactory
- .getUnionStructObjectInspector(Arrays
- .asList(new StructObjectInspector[] {structTblValInpector, structTblKeyInpector}));
+ .getUnionStructObjectInspector(Arrays.asList(structTblValInpector, structTblKeyInpector));
skewKeysTableObjectInspector.put((byte) i, structTblInpector);
}
// reset rowcontainer's serde, objectinspector, and tableDesc.
for (int i = 0; i < numAliases; i++) {
Byte alias = conf.getTagOrder()[i];
- RowContainer<ArrayList<Object>> rc = (RowContainer)joinOp.storage.get(Byte
- .valueOf((byte) i));
+ RowContainer<ArrayList<Object>> rc = (RowContainer)joinOp.storage[i];
if (rc != null) {
rc.setSerDe(tblSerializers.get((byte) i), skewKeysTableObjectInspector
.get((byte) i));
@@ -180,8 +178,7 @@ public class SkewJoinHandler {
if (skewKeyInCurrentGroup) {
String specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag);
- RowContainer<ArrayList<Object>> bigKey = (RowContainer)joinOp.storage.get(Byte
- .valueOf((byte) currBigKeyTag));
+ RowContainer<ArrayList<Object>> bigKey = (RowContainer)joinOp.storage[currBigKeyTag];
Path outputPath = getOperatorOutputPath(specPath);
FileSystem destFs = outputPath.getFileSystem(hconf);
bigKey.copyToDFSDirecory(destFs, outputPath);
@@ -190,8 +187,7 @@ public class SkewJoinHandler {
if (((byte) i) == currBigKeyTag) {
continue;
}
- RowContainer<ArrayList<Object>> values = (RowContainer)joinOp.storage.get(Byte
- .valueOf((byte) i));
+ RowContainer<ArrayList<Object>> values = (RowContainer)joinOp.storage[i];
if (values != null) {
specPath = conf.getSmallKeysDirMap().get((byte) currBigKeyTag).get(
(byte) i);
@@ -218,8 +214,7 @@ public class SkewJoinHandler {
skewKeyInCurrentGroup = false;
for (int i = 0; i < numAliases; i++) {
- RowContainer<ArrayList<Object>> rc = (RowContainer)joinOp.storage.get(Byte
- .valueOf((byte) i));
+ RowContainer<ArrayList<Object>> rc = (RowContainer)joinOp.storage[i];
if (rc != null) {
rc.setKeyObject(dummyKey);
}
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Sat Mar 2 22:37:59 2013
@@ -295,4 +295,9 @@ public class TableScanOperator extends O
public boolean supportSkewJoinOptimization() {
return true;
}
+
+ @Override
+ public boolean supportAutomaticSortMergeJoin() {
+ return true;
+ }
}
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Sat Mar 2 22:37:59 2013
@@ -65,6 +65,11 @@ public class UDTFOperator extends Operat
udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector();
}
objToSendToUDTF = new Object[inputFields.size()];
+
+ MapredContext context = MapredContext.get();
+ if (context != null) {
+ context.setup(conf.getGenericUDTF());
+ }
StructObjectInspector udtfOutputOI = conf.getGenericUDTF().initialize(
udtfInputOIs);
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Sat Mar 2 22:37:59 2013
@@ -325,6 +325,13 @@ public class RCFile {
public int[] getEachColumnValueLen() {
return eachColumnValueLen;
}
+
+ /**
+ * @return the numberRows
+ */
+ public int getNumberRows() {
+ return numberRows;
+ }
}
/**
Modified: hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1451954&r1=1451953&r2=1451954&view=diff
==============================================================================
--- hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/ptf-windowing/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Sat Mar 2 22:37:59 2013
@@ -1355,6 +1355,12 @@ private void constructOneLBLocationMap(F
// No leaves in this directory
LOG.info("NOT moving empty directory: " + s.getPath());
} else {
+ try {
+ validatePartitionNameCharacters(
+ Warehouse.getPartValuesFromPartName(s.getPath().getParent().toString()));
+ } catch (MetaException e) {
+ throw new HiveException(e);
+ }
validPartitions.add(s.getPath().getParent());
}
}
@@ -1700,7 +1706,7 @@ private void constructOneLBLocationMap(F
}
}
- private static List<String> getPvals(List<FieldSchema> partCols,
+ public static List<String> getPvals(List<FieldSchema> partCols,
Map<String, String> partSpec) {
List<String> pvals = new ArrayList<String>();
for (FieldSchema field : partCols) {
@@ -1873,6 +1879,15 @@ private void constructOneLBLocationMap(F
return results;
}
+ public void validatePartitionNameCharacters(List<String> partVals) throws HiveException {
+ try {
+ getMSC().validatePartitionNameCharacters(partVals);
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new HiveException(e);
+ }
+ }
+
/**
* Get the name of the current database
* @return the current database name