You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/26 21:16:13 UTC
svn commit: r1476348 [8/29] - in /hive/branches/vectorization: ./ beeline/
beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/
beeline/src/test/org/apache/ beeline/src/test/org/apache/hive/
beeline/src/test/org/apache/hive/beeline/ beeline/...
Modified: hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreWithEnvironmentContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreWithEnvironmentContext.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreWithEnvironmentContext.java (original)
+++ hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStoreWithEnvironmentContext.java Fri Apr 26 19:14:49 2013
@@ -60,7 +60,7 @@ public class TestHiveMetaStoreWithEnviro
private Table table = new Table();
private final Partition partition = new Partition();
- private static final String dbName = "tmpdb";
+ private static final String dbName = "hive3252";
private static final String tblName = "tmptbl";
private static final String renamed = "tmptbl2";
Modified: hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java (original)
+++ hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMarkPartition.java Fri Apr 26 19:14:49 2013
@@ -62,36 +62,36 @@ public class TestMarkPartition extends T
InvalidPartitionException, UnknownPartitionException, InterruptedException {
HiveMetaStoreClient msc = new HiveMetaStoreClient(hiveConf, null);
driver = new Driver(hiveConf);
- driver.run("drop database if exists tmpdb cascade");
- driver.run("create database tmpdb");
- driver.run("use tmpdb");
+ driver.run("drop database if exists hive2215 cascade");
+ driver.run("create database hive2215");
+ driver.run("use hive2215");
driver.run("drop table if exists tmptbl");
driver.run("create table tmptbl (a string) partitioned by (b string)");
driver.run("alter table tmptbl add partition (b='2011')");
Map<String,String> kvs = new HashMap<String, String>();
kvs.put("b", "'2011'");
- msc.markPartitionForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
- assert msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
+ msc.markPartitionForEvent("hive2215", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
+ assert msc.isPartitionMarkedForEvent("hive2215", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
Thread.sleep(10000);
- assert !msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
+ assert !msc.isPartitionMarkedForEvent("hive2215", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
kvs.put("b", "'2012'");
- assert !msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
+ assert !msc.isPartitionMarkedForEvent("hive2215", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
try{
- msc.markPartitionForEvent("tmpdb", "tmptbl2", kvs, PartitionEventType.LOAD_DONE);
+ msc.markPartitionForEvent("hive2215", "tmptbl2", kvs, PartitionEventType.LOAD_DONE);
assert false;
} catch(Exception e){
assert e instanceof UnknownTableException;
}
try{
- msc.isPartitionMarkedForEvent("tmpdb", "tmptbl2", kvs, PartitionEventType.LOAD_DONE);
+ msc.isPartitionMarkedForEvent("hive2215", "tmptbl2", kvs, PartitionEventType.LOAD_DONE);
assert false;
} catch(Exception e){
assert e instanceof UnknownTableException;
}
kvs.put("a", "'2012'");
try{
- msc.isPartitionMarkedForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
+ msc.isPartitionMarkedForEvent("hive2215", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
assert false;
} catch(Exception e){
assert e instanceof InvalidPartitionException;
@@ -100,7 +100,7 @@ public class TestMarkPartition extends T
@Override
protected void tearDown() throws Exception {
- driver.run("drop database if exists tmpdb cascade");
+ driver.run("drop database if exists hive2215 cascade");
super.tearDown();
}
Modified: hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEndFunctionListener.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEndFunctionListener.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEndFunctionListener.java (original)
+++ hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEndFunctionListener.java Fri Apr 26 19:14:49 2013
@@ -69,7 +69,7 @@ public class TestMetaStoreEndFunctionLis
/* Objective here is to ensure that when exceptions are thrown in HiveMetaStore in API methods
* they bubble up and are stored in the MetaStoreEndFunctionContext objects
*/
- String dbName = "tmpdb";
+ String dbName = "hive3524";
String tblName = "tmptbl";
int listSize = 0;
@@ -109,7 +109,7 @@ public class TestMetaStoreEndFunctionLis
assertEquals(context.getInputTableName(), tableName);
try {
- msc.getPartition("tmpdb", tblName, "b=2012");
+ msc.getPartition("hive3524", tblName, "b=2012");
}
catch (Exception e2) {
}
Modified: hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java (original)
+++ hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestMetaStoreEventListener.java Fri Apr 26 19:14:49 2013
@@ -65,7 +65,7 @@ public class TestMetaStoreEventListener
private HiveMetaStoreClient msc;
private Driver driver;
- private static final String dbName = "tmpdb";
+ private static final String dbName = "hive2038";
private static final String tblName = "tmptbl";
private static final String renamed = "tmptbl2";
@@ -217,7 +217,7 @@ public class TestMetaStoreEventListener
driver.run("alter table tmptbl add partition (b='2011')");
listSize++;
- Partition part = msc.getPartition("tmpdb", "tmptbl", "b=2011");
+ Partition part = msc.getPartition("hive2038", "tmptbl", "b=2011");
assertEquals(notifyList.size(), listSize);
assertEquals(preNotifyList.size(), listSize);
@@ -304,7 +304,7 @@ public class TestMetaStoreEventListener
Map<String,String> kvs = new HashMap<String, String>(1);
kvs.put("b", "2011");
- msc.markPartitionForEvent("tmpdb", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
+ msc.markPartitionForEvent("hive2038", "tmptbl", kvs, PartitionEventType.LOAD_DONE);
listSize++;
assertEquals(notifyList.size(), listSize);
assertEquals(preNotifyList.size(), listSize);
Modified: hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestRetryingHMSHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestRetryingHMSHandler.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestRetryingHMSHandler.java (original)
+++ hive/branches/vectorization/metastore/src/test/org/apache/hadoop/hive/metastore/TestRetryingHMSHandler.java Fri Apr 26 19:14:49 2013
@@ -68,7 +68,7 @@ public class TestRetryingHMSHandler exte
// Create a database and a table in that database. Because the AlternateFailurePreListener is
// being used each attempt to create something should require two calls by the RetryingHMSHandler
public void testRetryingHMSHandler() throws Exception {
- String dbName = "tmpdb";
+ String dbName = "hive4159";
String tblName = "tmptbl";
Database db = new Database();
Modified: hive/branches/vectorization/ql/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/build.xml?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/build.xml (original)
+++ hive/branches/vectorization/ql/build.xml Fri Apr 26 19:14:49 2013
@@ -201,7 +201,7 @@
<!-- Override jar target to specify main class and compiler stuff -->
- <target name="jar" depends="compile">
+ <target name="jar" depends="make-pom,compile">
<echo message="Project: ${ant.project.name}"/>
<unzip src="${build.ivy.lib.dir}/default/libthrift-${libthrift.version}.jar" dest="${build.dir.hive}/thrift/classes">
<patternset>
Modified: hive/branches/vectorization/ql/ivy.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/ivy.xml?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/ivy.xml (original)
+++ hive/branches/vectorization/ql/ivy.xml Fri Apr 26 19:14:49 2013
@@ -28,8 +28,6 @@
<dependencies>
<dependency org="org.apache.hive" name="hive-metastore" rev="${version}"
conf="compile->default" />
- <dependency org="org.apache.hive" name="hive-builtins" rev="${version}"
- conf="test->default,runtime" transitive="false"/>
<dependency org="org.apache.hive" name="hive-hbase-handler" rev="${version}"
conf="test->default" transitive="false"/>
<dependency org="org.apache.hive" name="hive-contrib" rev="${version}"
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Fri Apr 26 19:14:49 2013
@@ -1056,6 +1056,10 @@ public class Driver implements CommandPr
conf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
conf.setVar(HiveConf.ConfVars.HIVEQUERYSTRING, queryStr);
+
+ conf.set("mapreduce.workflow.id", "hive_"+queryId);
+ conf.set("mapreduce.workflow.name", queryStr);
+
maxthreads = HiveConf.getIntVar(conf, HiveConf.ConfVars.EXECPARALLETHREADNUMBER);
try {
@@ -1334,6 +1338,8 @@ public class Driver implements CommandPr
if (noName) {
conf.setVar(HiveConf.ConfVars.HADOOPJOBNAME, jobname + "(" + tsk.getId() + ")");
}
+ conf.set("mapreduce.workflow.node.name", tsk.getId());
+ Utilities.setWorkflowAdjacencies(conf, plan);
cxt.incCurJobNo(1);
console.printInfo("Launching Job " + cxt.getCurJobNo() + " out of " + jobs);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Fri Apr 26 19:14:49 2013
@@ -301,7 +301,7 @@ public enum ErrorMsg {
"Cannot ALTER VIEW AS SELECT if view currently does not exist\n"),
REPLACE_VIEW_WITH_PARTITION(10217,
"Cannot replace a view with CREATE VIEW or REPLACE VIEW or " +
- "ALTER VIEW AS SELECT if the view has paritions\n"),
+ "ALTER VIEW AS SELECT if the view has partitions\n"),
EXISTING_TABLE_IS_NOT_VIEW(10218,
"Existing table is not a view\n"),
NO_SUPPORTED_ORDERBY_ALLCOLREF_POS(10219,
@@ -321,6 +321,13 @@ public enum ErrorMsg {
"with distincts. Either set hive.new.job.grouping.set.cardinality to a high number " +
"(higher than the number of rows per input row due to grouping sets in the query), or " +
"rewrite the query to not use distincts."),
+ TRUNCATE_COLUMN_INDEXED_TABLE(10227, "Can not truncate columns from table with indexes"),
+ TRUNCATE_COLUMN_NOT_RC(10228, "Only RCFileFormat supports column truncation."),
+ TRUNCATE_COLUMN_ARCHIVED(10229, "Column truncation cannot be performed on archived partitions."),
+ TRUNCATE_BUCKETED_COLUMN(10230,
+ "A column on which a partition/table is bucketed cannot be truncated."),
+ TRUNCATE_LIST_BUCKETED_COLUMN(10231,
+ "A column on which a partition/table is list bucketed cannot be truncated."),
OPERATOR_NOT_ALLOWED_WITH_MAPJOIN(10227,
"Not all clauses are supported with mapjoin hint. Please remove mapjoin hint."),
@@ -338,6 +345,10 @@ public enum ErrorMsg {
+ "fails to construct aggregation for the partition "),
ANALYZE_TABLE_PARTIALSCAN_AUTOGATHER(10233, "Analyze partialscan is not allowed " +
"if hive.stats.autogather is set to false"),
+ PARTITION_VALUE_NOT_CONTINUOUS(10234, "Parition values specifed are not continuous." +
+ " A subpartition value is specified without specififying the parent partition's value"),
+ TABLES_INCOMPATIBLE_SCHEMAS(10235, "Tables have incompatible schemas and their partitions " +
+ " cannot be exchanged."),
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Fri Apr 26 19:14:49 2013
@@ -167,5 +167,4 @@ public abstract class AbstractMapJoinOpe
protected boolean hasAnyNulls(AbstractMapJoinKey key) {
return key.hasAnyNulls(nullsafes);
}
-
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Fri Apr 26 19:14:49 2013
@@ -40,10 +40,10 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.Map.Entry;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
@@ -86,6 +86,8 @@ import org.apache.hadoop.hive.ql.hooks.R
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
@@ -109,10 +111,10 @@ import org.apache.hadoop.hive.ql.parse.B
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.AlterIndexDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableAlterPartDesc;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
import org.apache.hadoop.hive.ql.plan.AlterTableSimpleDesc;
-import org.apache.hadoop.hive.ql.plan.AlterTableAlterPartDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableExchangePartition;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.CreateIndexDesc;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
@@ -150,6 +152,7 @@ import org.apache.hadoop.hive.ql.plan.Sh
import org.apache.hadoop.hive.ql.plan.SwitchDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.security.authorization.Privilege;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -429,6 +432,12 @@ public class DDLTask extends Task<DDLWor
return truncateTable(db, truncateTableDesc);
}
+ AlterTableExchangePartition alterTableExchangePartition =
+ work.getAlterTableExchangePartition();
+ if (alterTableExchangePartition != null) {
+ return exchangeTablePartition(db, alterTableExchangePartition);
+ }
+
} catch (InvalidTableException e) {
formatter.consoleError(console, "Table " + e.getTableName() + " does not exist",
formatter.MISSING);
@@ -2323,7 +2332,8 @@ public class DDLTask extends Task<DDLWor
List<FieldSchema> cols = table.getCols();
cols.addAll(table.getPartCols());
- outStream.writeBytes(MetaDataFormatUtils.getAllColumnsInformation(cols));
+ outStream.writeBytes(
+ MetaDataFormatUtils.getAllColumnsInformation(cols, false));
((FSDataOutputStream) outStream).close();
outStream = null;
} catch (IOException e) {
@@ -3950,6 +3960,21 @@ public class DDLTask extends Task<DDLWor
}
private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws HiveException {
+
+ if (truncateTableDesc.getColumnIndexes() != null) {
+ ColumnTruncateWork truncateWork = new ColumnTruncateWork(
+ truncateTableDesc.getColumnIndexes(), truncateTableDesc.getInputDir(),
+ truncateTableDesc.getOutputDir());
+ truncateWork.setListBucketingCtx(truncateTableDesc.getLbCtx());
+ truncateWork.setMapperCannotSpanPartns(true);
+ DriverContext driverCxt = new DriverContext();
+ ColumnTruncateTask taskExec = new ColumnTruncateTask();
+ taskExec.initialize(db.getConf(), null, driverCxt);
+ taskExec.setWork(truncateWork);
+ taskExec.setQueryPlan(this.getQueryPlan());
+ return taskExec.execute(driverCxt);
+ }
+
String tableName = truncateTableDesc.getTableName();
Map<String, String> partSpec = truncateTableDesc.getPartSpec();
@@ -3968,6 +3993,17 @@ public class DDLTask extends Task<DDLWor
return 0;
}
+ private int exchangeTablePartition(Hive db,
+ AlterTableExchangePartition exchangePartition) throws HiveException {
+ Map<String, String> partitionSpecs = exchangePartition.getPartitionSpecs();
+ Table destTable = exchangePartition.getDestinationTable();
+ Table sourceTable = exchangePartition.getSourceTable();
+ db.exchangeTablePartitions(partitionSpecs, sourceTable.getDbName(),
+ sourceTable.getTableName(),destTable.getDbName(),
+ destTable.getTableName());
+ return 0;
+ }
+
private List<Path> getLocations(Hive db, Table table, Map<String, String> partSpec)
throws HiveException {
List<Path> locations = new ArrayList<Path>();
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Apr 26 19:14:49 2013
@@ -412,7 +412,7 @@ public class ExecDriver extends Task<Map
LOG.info("Add 1 archive file to distributed cache. Archive file: " + hdfsFilePath.toUri());
}
}
-
+ work.configureJobConf(job);
addInputPaths(job, work, emptyScratchDirStr, ctx);
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Fri Apr 26 19:14:49 2013
@@ -50,7 +50,6 @@ public class ExecMapper extends MapReduc
private JobConf jc;
private boolean abort = false;
private Reporter rp;
- private List<OperatorHook> opHooks;
public static final Log l4j = LogFactory.getLog("ExecMapper");
private static boolean done;
@@ -99,7 +98,6 @@ public class ExecMapper extends MapReduc
mo.setExecContext(execContext);
mo.initializeLocalWork(jc);
mo.initialize(jc, null);
- opHooks = OperatorHookUtils.getOperatorHooks(jc);
if (localWork == null) {
return;
@@ -132,7 +130,6 @@ public class ExecMapper extends MapReduc
rp = reporter;
mo.setOutputCollector(oc);
mo.setReporter(rp);
- mo.setOperatorHooks(opHooks);
MapredContext.get().setReporter(reporter);
}
// reset the execContext for each new row
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Fri Apr 26 19:14:49 2013
@@ -66,7 +66,6 @@ public class ExecReducer extends MapRedu
private long nextCntr = 1;
private static String[] fieldNames;
- private List<OperatorHook> opHooks;
public static final Log l4j = LogFactory.getLog("ExecReducer");
private boolean isLogInfoEnabled = false;
@@ -152,7 +151,6 @@ public class ExecReducer extends MapRedu
try {
l4j.info(reducer.dump(0));
reducer.initialize(jc, rowObjectInspector);
- opHooks = OperatorHookUtils.getOperatorHooks(jc);
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
@@ -183,7 +181,6 @@ public class ExecReducer extends MapRedu
rp = reporter;
reducer.setOutputCollector(oc);
reducer.setReporter(rp);
- reducer.setOperatorHooks(opHooks);
MapredContext.get().setReporter(reporter);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Fri Apr 26 19:14:49 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -708,7 +709,7 @@ public class FileSinkOperator extends Te
List<String> skewedCols = lbCtx.getSkewedColNames();
List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
List<String> skewedValsCandidate = null;
- Map<List<String>, String> locationMap = lbCtx.getLbLocationMap();
+ Map<SkewedValueList, String> locationMap = lbCtx.getLbLocationMap();
/* Convert input row to standard objects. */
ObjectInspectorUtils.copyToStandardObject(standObjs, row,
@@ -726,14 +727,14 @@ public class FileSinkOperator extends Te
if (allSkewedVals.contains(skewedValsCandidate)) {
/* matches skewed values. */
lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate);
- locationMap.put(skewedValsCandidate, lbDirName);
+ locationMap.put(new SkewedValueList(skewedValsCandidate), lbDirName);
} else {
/* create default directory. */
lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
lbCtx.getDefaultDirName());
List<String> defaultKey = Arrays.asList(lbCtx.getDefaultKey());
if (!locationMap.containsKey(defaultKey)) {
- locationMap.put(defaultKey, lbDirName);
+ locationMap.put(new SkewedValueList(defaultKey), lbDirName);
}
}
return lbDirName;
@@ -754,7 +755,9 @@ public class FileSinkOperator extends Te
// check # of dp
if (valToPaths.size() > maxPartitions) {
// throw fatal error
- incrCounter(fatalErrorCntr, 1);
+ if (counterNameToEnum != null) {
+ incrCounter(fatalErrorCntr, 1);
+ }
fatalError = true;
LOG.error("Fatal error was thrown due to exceeding number of dynamic partitions");
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Apr 26 19:14:49 2013
@@ -58,12 +58,12 @@ import org.apache.hadoop.hive.serde2.laz
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -90,8 +90,6 @@ public class GroupByOperator extends Ope
protected transient ObjectInspector[][] aggregationParameterObjectInspectors;
protected transient ObjectInspector[][] aggregationParameterStandardObjectInspectors;
protected transient Object[][] aggregationParameterObjects;
- // In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in
- // the same SQL clause,
// so aggregationIsDistinct is a boolean array instead of a single number.
protected transient boolean[] aggregationIsDistinct;
// Map from integer tag to distinct aggrs
@@ -887,8 +885,15 @@ public class GroupByOperator extends Ope
// Forward the current keys if needed for sort-based aggregation
if (currentKeys != null && !keysAreEqual) {
- forward(currentKeys.getKeyArray(), aggregations);
- countAfterReport = 0;
+ // This is to optimize queries of the form:
+ // select count(distinct key) from T
+ // where T is sorted and bucketized by key
+ // Partial aggregation is performed on the mapper, and the
+ // reducer gets 1 row (partial result) per mapper.
+ if (!conf.isDontResetAggrsDistinct()) {
+ forward(currentKeys.getKeyArray(), aggregations);
+ countAfterReport = 0;
+ }
}
// Need to update the keys?
@@ -900,7 +905,10 @@ public class GroupByOperator extends Ope
}
// Reset the aggregations
- resetAggregations(aggregations);
+ // For distincts optimization with sorting/bucketing, perform partial aggregation
+ if (!conf.isDontResetAggrsDistinct()) {
+ resetAggregations(aggregations);
+ }
// clear parameters in last-invoke
for (int i = 0; i < aggregationsParametersLastInvoke.length; i++) {
@@ -1076,7 +1084,7 @@ public class GroupByOperator extends Ope
try {
// put the hash related stats in statsMap if applicable, so that they
// are sent to jt as counters
- if (hashAggr) {
+ if (hashAggr && counterNameToEnum != null) {
incrCounter(counterNameHashOut, numRowsHashTbl);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Fri Apr 26 19:14:49 2013
@@ -54,6 +54,7 @@ public class HashTableSinkOperator exten
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
+ protected static MapJoinMetaData metadata = new MapJoinMetaData();
// from abstract map join operator
/**
* The expressions for join inputs's join keys.
@@ -164,6 +165,10 @@ public class HashTableSinkOperator exten
}
+ public static MapJoinMetaData getMetadata() {
+ return metadata;
+ }
+
private static final transient String[] FATAL_ERR_MSG = {
null, // counter value 0 means no error
"Mapside join exceeds available memory. "
@@ -301,8 +306,7 @@ public class HashTableSinkOperator exten
null);
keySerializer.initialize(null, keyTableDesc.getProperties());
- MapJoinMetaData.clear();
- MapJoinMetaData.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
+ metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
}
@@ -349,7 +353,8 @@ public class HashTableSinkOperator exten
// Construct externalizable objects for key and value
if (needNewKey) {
- MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
+ MapJoinObjectValue valueObj = new MapJoinObjectValue(
+ metadataValueTag[tag], res);
rowNumber++;
if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
@@ -391,7 +396,7 @@ public class HashTableSinkOperator exten
.getStandardStructObjectInspector(newNames, newFields);
int alias = Integer.valueOf(metadataValueTag[tag]);
- MapJoinMetaData.put(alias, new HashTableSinkObjectCtx(
+ metadata.put(Integer.valueOf(metadataValueTag[tag]), new HashTableSinkObjectCtx(
standardOI, valueSerDe, valueTableDesc, hasFilter(alias), hconf));
}
@@ -435,7 +440,7 @@ public class HashTableSinkOperator exten
super.closeOp(abort);
} catch (Exception e) {
- LOG.error("Generate Hashtable error");
+ LOG.error("Generate Hashtable error", e);
e.printStackTrace();
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Fri Apr 26 19:14:49 2013
@@ -269,4 +269,14 @@ public class JoinOperator extends Common
// optimizations for now.
return false;
}
+
+ @Override
+ public boolean opAllowedBeforeSortMergeJoin() {
+ // If a join occurs before the sort-merge join, it is not useful to convert the the sort-merge
+ // join to a mapjoin. It might be simpler to perform the join and then a sort-merge join
+ // join. By converting the sort-merge join to a map-join, the job will be executed in 2
+ // mapjoins in the best case. The number of inputs for the join is more than 1 so it would
+ // be difficult to figure out the big table for the mapjoin.
+ return false;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java Fri Apr 26 19:14:49 2013
@@ -24,20 +24,21 @@ import java.util.Map;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
public class MapJoinMetaData {
- static transient Map<Integer, HashTableSinkObjectCtx> mapMetadata = new HashMap<Integer, HashTableSinkObjectCtx>();
+ transient Map<Integer, HashTableSinkObjectCtx> mapMetadata =
+ new HashMap<Integer, HashTableSinkObjectCtx>();
static ArrayList<Object> list = new ArrayList<Object>();
public MapJoinMetaData(){
}
- public static void put(Integer key, HashTableSinkObjectCtx value){
+ public void put(Integer key, HashTableSinkObjectCtx value){
mapMetadata.put(key, value);
}
- public static HashTableSinkObjectCtx get(Integer key){
+ public HashTableSinkObjectCtx get(Integer key){
return mapMetadata.get(key);
}
- public static void clear(){
+ public void clear(){
mapMetadata.clear();
}
@@ -45,5 +46,4 @@ public class MapJoinMetaData {
list.clear();
return list;
}
-
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Apr 26 19:14:49 2013
@@ -54,6 +54,11 @@ public class MapJoinOperator extends Abs
protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
+ protected static MapJoinMetaData metadata = new MapJoinMetaData();
+ public static MapJoinMetaData getMetadata() {
+ return metadata;
+ }
+
private static final transient String[] FATAL_ERR_MSG = {
null, // counter value 0 means no error
"Mapside join exceeds available memory. "
@@ -117,7 +122,7 @@ public class MapJoinOperator extends Abs
SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
null);
keySerializer.initialize(null, keyTableDesc.getProperties());
- MapJoinMetaData.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
+ metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
@@ -136,7 +141,7 @@ public class MapJoinOperator extends Abs
valueSerDe.initialize(null, valueTableDesc.getProperties());
ObjectInspector inspector = valueSerDe.getObjectInspector();
- MapJoinMetaData.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils
+ metadata.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils
.getStandardObjectInspector(inspector, ObjectInspectorCopyOption.WRITABLE),
valueSerDe, valueTableDesc, hasFilter(pos), hconf));
}
@@ -189,8 +194,8 @@ public class MapJoinOperator extends Abs
hashtable.initilizePersistentHash(path.toUri().getPath());
}
} catch (Exception e) {
- LOG.error("Load Distributed Cache Error");
- throw new HiveException(e.getMessage());
+ LOG.error("Load Distributed Cache Error", e);
+ throw new HiveException(e);
}
}
@@ -230,11 +235,8 @@ public class MapJoinOperator extends Abs
// compute keys and values as StandardObjects
AbstractMapJoinKey key = JoinUtil.computeMapJoinKeys(row, joinKeys[alias],
joinKeysObjectInspectors[alias]);
- ArrayList<Object> value = getFilteredValue(alias, row);
-
- // Add the value to the ArrayList
- storage[alias].add(value);
+ boolean joinNeeded = false;
for (byte pos = 0; pos < order.length; pos++) {
if (pos != alias) {
@@ -243,12 +245,14 @@ public class MapJoinOperator extends Abs
// there is no join-value or join-key has all null elements
if (o == null || key.hasAnyNulls(nullsafes)) {
- if (noOuterJoin) {
- storage[pos] = emptyList;
- } else {
+ if (!noOuterJoin) {
+ joinNeeded = true;
storage[pos] = dummyObjVectors[pos];
+ } else {
+ storage[pos] = emptyList;
}
} else {
+ joinNeeded = true;
rowContainer.reset(o.getObj());
storage[pos] = rowContainer;
aliasFilterTags[pos] = o.getAliasFilter();
@@ -256,8 +260,15 @@ public class MapJoinOperator extends Abs
}
}
- // generate the output records
- checkAndGenObject();
+ if (joinNeeded) {
+ ArrayList<Object> value = getFilteredValue(alias, row);
+
+ // Add the value to the ArrayList
+ storage[alias].add(value);
+
+ // generate the output records
+ checkAndGenObject();
+ }
// done with the row
storage[tag].clear();
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Fri Apr 26 19:14:49 2013
@@ -138,7 +138,6 @@ public class MapOperator extends Operato
public void setOp(Operator<? extends OperatorDesc> op) {
this.op = op;
}
-
}
private static class MapOpCtx {
@@ -468,8 +467,8 @@ public class MapOperator extends Operato
public void setChildren(Configuration hconf) throws HiveException {
- Path fpath = new Path((new Path(HiveConf.getVar(hconf,
- HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath());
+ Path fpath = new Path(HiveConf.getVar(hconf,
+ HiveConf.ConfVars.HADOOPMAPFILENAME));
ArrayList<Operator<? extends OperatorDesc>> children =
new ArrayList<Operator<? extends OperatorDesc>>();
@@ -481,7 +480,7 @@ public class MapOperator extends Operato
try {
for (String onefile : conf.getPathToAliases().keySet()) {
MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile, convertedOI);
- Path onepath = new Path(new Path(onefile).toUri().getPath());
+ Path onepath = new Path(onefile);
List<String> aliases = conf.getPathToAliases().get(onefile);
for (String onealias : aliases) {
@@ -513,7 +512,7 @@ public class MapOperator extends Operato
// didn't find match for input file path in configuration!
// serious problem ..
LOG.error("Configuration does not have any alias for path: "
- + fpath.toUri().getPath());
+ + fpath.toUri());
throw new HiveException("Configuration and input path are inconsistent");
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapredLocalTask.java Fri Apr 26 19:14:49 2013
@@ -58,6 +58,8 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
@@ -195,6 +197,14 @@ public class MapredLocalTask extends Tas
// }
+ //Set HADOOP_USER_NAME env variable for child process, so that
+ // it also runs with hadoop permissions for the user the job is running as
+ // This will be used by hadoop only in unsecure(/non kerberos) mode
+ HadoopShims shim = ShimLoader.getHadoopShims();
+ String endUserName = shim.getShortUserName(shim.getUGIForConf(job));
+ console.printInfo("setting HADOOP_USER_NAME\t" + endUserName);
+ variables.put("HADOOP_USER_NAME", endUserName);
+
if (variables.containsKey(HADOOP_OPTS_KEY)) {
variables.put(HADOOP_OPTS_KEY, variables.get(HADOOP_OPTS_KEY) + hadoopOpts);
} else {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Fri Apr 26 19:14:49 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.Context;
@@ -46,7 +47,6 @@ import org.apache.hadoop.hive.ql.io.rcfi
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -394,7 +394,8 @@ public class MoveTask extends Task<MoveW
}
dc = null; // reset data container to prevent it being added again.
} else { // static partitions
- List<String> partVals = Hive.getPvals(table.getPartCols(), tbd.getPartitionSpec());
+ List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),
+ tbd.getPartitionSpec());
db.validatePartitionNameCharacters(partVals);
db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(),
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Apr 26 19:14:49 2013
@@ -55,7 +55,6 @@ public abstract class Operator<T extends
// Bean methods
private static final long serialVersionUID = 1L;
- List<OperatorHook> operatorHooks;
private Configuration configuration;
protected List<Operator<? extends OperatorDesc>> childOperators;
@@ -241,17 +240,6 @@ public abstract class Operator<T extends
return id;
}
- public void setOperatorHooks(List<OperatorHook> opHooks){
- operatorHooks = opHooks;
- if (childOperators == null) {
- return;
- }
-
- for (Operator<? extends OperatorDesc> op : childOperators) {
- op.setOperatorHooks(opHooks);
- }
- }
-
public void setReporter(Reporter rep) {
reporter = rep;
@@ -436,34 +424,6 @@ public abstract class Operator<T extends
}
}
- private void enterOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
- if (this.operatorHooks == null) {
- return;
- }
- for(OperatorHook opHook : this.operatorHooks) {
- opHook.enter(opHookContext);
- }
- }
-
- private void exitOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
- if (this.operatorHooks == null) {
- return;
- }
- for(OperatorHook opHook : this.operatorHooks) {
- opHook.exit(opHookContext);
- }
- }
-
- private void closeOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
- if (this.operatorHooks == null) {
- return;
- }
- for(OperatorHook opHook : this.operatorHooks) {
- opHook.close(opHookContext);
- }
- }
-
-
/**
* Collects all the parent's output object inspectors and calls actual
* initialization method.
@@ -525,12 +485,22 @@ public abstract class Operator<T extends
if (fatalError) {
return;
}
- OperatorHookContext opHookContext = new OperatorHookContext(this, row, tag);
- preProcessCounter();
- enterOperatorHooks(opHookContext);
- processOp(row, tag);
- exitOperatorHooks(opHookContext);
- postProcessCounter();
+
+ if (counterNameToEnum != null) {
+ inputRows++;
+ if ((inputRows % 1000) == 0) {
+ incrCounter(numInputRowsCntr, inputRows);
+ incrCounter(timeTakenCntr, totalTime);
+ inputRows = 0;
+ totalTime = 0;
+ }
+
+ beginTime = System.currentTimeMillis();
+ processOp(row, tag);
+ totalTime += (System.currentTimeMillis() - beginTime);
+ } else {
+ processOp(row, tag);
+ }
}
// If a operator wants to do some work at the beginning of a group
@@ -606,13 +576,14 @@ public abstract class Operator<T extends
state = State.CLOSE;
LOG.info(id + " finished. closing... ");
- incrCounter(numInputRowsCntr, inputRows);
- incrCounter(numOutputRowsCntr, outputRows);
- incrCounter(timeTakenCntr, totalTime);
+ if (counterNameToEnum != null) {
+ incrCounter(numInputRowsCntr, inputRows);
+ incrCounter(numOutputRowsCntr, outputRows);
+ incrCounter(timeTakenCntr, totalTime);
+ }
LOG.info(id + " forwarded " + cntr + " rows");
- closeOperatorHooks(new OperatorHookContext(this));
// call the operator specific close routine
closeOp(abort);
@@ -822,9 +793,11 @@ public abstract class Operator<T extends
protected void forward(Object row, ObjectInspector rowInspector)
throws HiveException {
- if ((++outputRows % 1000) == 0) {
- incrCounter(numOutputRowsCntr, outputRows);
- outputRows = 0;
+ if (counterNameToEnum != null) {
+ if ((++outputRows % 1000) == 0) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ outputRows = 0;
+ }
}
if (isLogInfoEnabled) {
@@ -1158,39 +1131,12 @@ public abstract class Operator<T extends
protected transient Object groupKeyObject;
/**
- * this is called before operator process to buffer some counters.
- */
- private void preProcessCounter() {
- inputRows++;
- if ((inputRows % 1000) == 0) {
- incrCounter(numInputRowsCntr, inputRows);
- incrCounter(timeTakenCntr, totalTime);
- inputRows = 0;
- totalTime = 0;
- }
- beginTime = System.currentTimeMillis();
- }
-
- /**
- * this is called after operator process to buffer some counters.
- */
- private void postProcessCounter() {
- if (counterNameToEnum != null) {
- totalTime += (System.currentTimeMillis() - beginTime);
- }
- }
-
- /**
* this is called in operators in map or reduce tasks.
*
* @param name
* @param amount
*/
protected void incrCounter(String name, long amount) {
- if(counterNameToEnum == null) {
- return;
- }
-
String counterName = getWrappedCounterName(name);
ProgressCounter pc = counterNameToEnum.get(counterName);
@@ -1525,6 +1471,15 @@ public abstract class Operator<T extends
return true;
}
+ /*
+ * If this task contains a sortmergejoin, it can be converted to a map-join task if this operator
+ * is present in the mapper. For eg. if a sort-merge join operator is present followed by a
+ * regular join, it cannot be converted to a auto map-join.
+ */
+ public boolean opAllowedBeforeSortMergeJoin() {
+ return true;
+ }
+
public String toString() {
return getName() + "[" + getIdentifier() + "]";
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Fri Apr 26 19:14:49 2013
@@ -72,14 +72,7 @@ public class PTFOperator extends Operato
hiveConf = new HiveConf(jobConf, PTFOperator.class);
// if the parent is ExtractOperator, this invocation is from reduce-side
Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
- if (parentOp instanceof ExtractOperator)
- {
- isMapOperator = false;
- }
- else
- {
- isMapOperator = true;
- }
+ isMapOperator = conf.isMapSide();
reconstructQueryDef(hiveConf);
inputPart = createFirstPartitionForChain(
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java Fri Apr 26 19:14:49 2013
@@ -31,6 +31,7 @@ import java.beans.XMLDecoder;
import java.beans.XMLEncoder;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -261,25 +262,26 @@ public class PTFUtils {
}
}
- public static void makeTransient(Class<?> beanClass, String pdName)
- {
- BeanInfo info;
- try
- {
- info = Introspector.getBeanInfo(beanClass);
- PropertyDescriptor[] propertyDescriptors = info
- .getPropertyDescriptors();
- for (int i = 0; i < propertyDescriptors.length; ++i)
- {
- PropertyDescriptor pd = propertyDescriptors[i];
- if (pd.getName().equals(pdName))
- {
- pd.setValue("transient", Boolean.TRUE);
+ public static void makeTransient(Class<?> beanClass, String... pdNames) {
+ try {
+ BeanInfo info = Introspector.getBeanInfo(beanClass);
+ PropertyDescriptor[] descs = info.getPropertyDescriptors();
+ if (descs == null) {
+ throw new RuntimeException("Cannot access property descriptor for class " + beanClass);
+ }
+ Map<String, PropertyDescriptor> mapping = new HashMap<String, PropertyDescriptor>();
+ for (PropertyDescriptor desc : descs) {
+ mapping.put(desc.getName(), desc);
+ }
+ for (String pdName : pdNames) {
+ PropertyDescriptor desc = mapping.get(pdName);
+ if (desc == null) {
+ throw new RuntimeException("Property " + pdName + " does not exist in " + beanClass);
}
+ desc.setValue("transient", Boolean.TRUE);
}
}
- catch (IntrospectionException ie)
- {
+ catch (IntrospectionException ie) {
throw new RuntimeException(ie);
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Fri Apr 26 19:14:49 2013
@@ -80,6 +80,15 @@ public class ReduceSinkOperator extends
transient byte[] tagByte = new byte[1];
transient protected int numDistributionKeys;
transient protected int numDistinctExprs;
+ transient String inputAlias; // input alias of this RS for join (used for PPD)
+
+ public void setInputAlias(String inputAlias) {
+ this.inputAlias = inputAlias;
+ }
+
+ public String getInputAlias() {
+ return inputAlias;
+ }
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Fri Apr 26 19:14:49 2013
@@ -80,6 +80,7 @@ public abstract class Task<T extends Ser
// hive.auto.convert.join.noconditionaltask is set to true. No conditional task was
// created in case the mapjoin failed.
public static final int MAPJOIN_ONLY_NOBACKUP = 6;
+ public static final int CONVERTED_SORTMERGEJOIN = 7;
// Descendants tasks who subscribe feeds from this task
protected transient List<Task<? extends Serializable>> feedSubscribers;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Fri Apr 26 19:14:49 2013
@@ -163,4 +163,12 @@ public class UnionOperator extends Opera
public boolean opAllowedAfterMapJoin() {
return false;
}
+
+ @Override
+ public boolean opAllowedBeforeSortMergeJoin() {
+ // If a union occurs before the sort-merge join, it is not useful to convert the the
+ // sort-merge join to a mapjoin. The number of inputs for the union is more than 1 so
+ // it would be difficult to figure out the big table for the mapjoin.
+ return false;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Apr 26 19:14:49 2013
@@ -121,6 +121,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.plan.api.Adjacency;
+import org.apache.hadoop.hive.ql.plan.api.Graph;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
@@ -229,6 +231,25 @@ public final class Utilities {
}
}
+ public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
+ try {
+ Graph stageGraph = plan.getQueryPlan().getStageGraph();
+ if (stageGraph == null)
+ return;
+ List<Adjacency> adjList = stageGraph.getAdjacencyList();
+ if (adjList == null)
+ return;
+ for (Adjacency adj : adjList) {
+ List<String> children = adj.getChildren();
+ if (children == null || children.isEmpty())
+ return;
+ conf.setStrings("mapreduce.workflow.adjacency."+adj.getNode(),
+ children.toArray(new String[children.size()]));
+ }
+ } catch (IOException e) {
+ }
+ }
+
public static List<String> getFieldSchemaString(List<FieldSchema> fl) {
if (fl == null) {
return null;
@@ -380,7 +401,7 @@ public final class Utilities {
public static String getHiveJobID(Configuration job) {
String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
- if (planPath != null) {
+ if (planPath != null && !planPath.isEmpty()) {
return (new Path(planPath)).getName();
}
return null;
@@ -690,11 +711,6 @@ public final class Utilities {
return new PartitionDesc(part, tblDesc);
}
- public static void addMapWork(MapredWork mr, Table tbl, String alias, Operator<?> work) {
- mr.addMapWork(tbl.getDataLocation().getPath(), alias, work, new PartitionDesc(
- getTableDesc(tbl), (LinkedHashMap<String, String>) null));
- }
-
private static String getOpTreeSkel_helper(Operator<?> op, String indent) {
if (op == null) {
return "";
@@ -2420,8 +2436,5 @@ public final class Utilities {
return sb.toString();
}
-
- public static Class getBuiltinUtilsClass() throws ClassNotFoundException {
- return Class.forName("org.apache.hive.builtins.BuiltinUtils");
- }
}
+
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Fri Apr 26 19:14:49 2013
@@ -158,8 +158,6 @@ public class HashMapWrapper<K, V> implem
}
public boolean isAbort(long numRows,LogHelper console) {
- System.gc();
- System.gc();
int size = mHash.size();
long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
double rate = (double) usedMemory / (double) maxMemory;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java Fri Apr 26 19:14:49 2013
@@ -23,8 +23,10 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -93,7 +95,7 @@ public class MapJoinDoubleKeys extends A
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
try {
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(Integer.valueOf(metadataTag));
Writable val = ctx.getSerDe().getSerializedClass().newInstance();
val.readFields(in);
@@ -124,7 +126,8 @@ public class MapJoinDoubleKeys extends A
try {
// out.writeInt(metadataTag);
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
+ Integer.valueOf(metadataTag));
ArrayList<Object> list = MapJoinMetaData.getList();
list.add(obj1);
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java Fri Apr 26 19:14:49 2013
@@ -23,8 +23,9 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -95,7 +96,7 @@ public class MapJoinObjectKey extends A
ClassNotFoundException {
try {
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(
+ HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(
Integer.valueOf(metadataTag));
Writable val = ctx.getSerDe().getSerializedClass().newInstance();
@@ -119,7 +120,7 @@ public class MapJoinObjectKey extends A
public void writeExternal(ObjectOutput out) throws IOException {
try {
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(
+ HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
Integer.valueOf(metadataTag));
// Different processing for key and value
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java Fri Apr 26 19:14:49 2013
@@ -24,8 +24,10 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -89,7 +91,8 @@ public class MapJoinObjectValue implemen
metadataTag = in.readInt();
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(
+ Integer.valueOf(metadataTag));
int sz = in.readInt();
MapJoinRowContainer<Object[]> res = new MapJoinRowContainer<Object[]>();
if (sz > 0) {
@@ -132,7 +135,8 @@ public class MapJoinObjectValue implemen
out.writeInt(metadataTag);
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
+ Integer.valueOf(metadataTag));
// Different processing for key and value
MapJoinRowContainer<Object[]> v = obj;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java Fri Apr 26 19:14:49 2013
@@ -23,8 +23,10 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -74,10 +76,12 @@ public class MapJoinSingleKey extends Ab
}
@Override
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(ObjectInput in)
+ throws IOException, ClassNotFoundException {
try {
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(
+ Integer.valueOf(metadataTag));
Writable val = ctx.getSerDe().getSerializedClass().newInstance();
val.readFields(in);
@@ -106,7 +110,8 @@ public class MapJoinSingleKey extends Ab
try {
// out.writeInt(metadataTag);
// get the tableDesc from the map stored in the mapjoin operator
- HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+ HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
+ Integer.valueOf(metadataTag));
ArrayList<Object> list = MapJoinMetaData.getList();
list.add(obj);
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Fri Apr 26 19:14:49 2013
@@ -243,6 +243,12 @@ public class RCFile {
this.numberRows = numberRows;
}
+ public void nullColumn(int columnIndex) {
+ eachColumnValueLen[columnIndex] = 0;
+ eachColumnUncompressedValueLen[columnIndex] = 0;
+ allCellValLenBuffer[columnIndex] = new NonSyncDataOutputBuffer();
+ }
+
/**
* add in a new column's meta data.
*
@@ -553,6 +559,14 @@ public class RCFile {
}
}
+ public void nullColumn(int columnIndex) {
+ if (codec != null) {
+ compressedColumnsValueBuffer[columnIndex].reset();
+ } else {
+ loadedColumnsValueBuffer[columnIndex].reset();
+ }
+ }
+
public void clearColumnBuffer() throws IOException {
decompressBuffer.reset();
}
@@ -1077,6 +1091,7 @@ public class RCFile {
public int rowReadIndex;
public int runLength;
public int prvLength;
+ public boolean isNulled;
}
private final Path file;
private final FSDataInputStream in;
@@ -1491,6 +1506,7 @@ public class RCFile {
col.rowReadIndex = 0;
col.runLength = 0;
col.prvLength = -1;
+ col.isNulled = colValLenBufferReadIn[selIx].getLength() == 0;
}
return currentKeyLength;
@@ -1694,18 +1710,22 @@ public class RCFile {
SelectedColumn col = selectedColumns[j];
int i = col.colIndex;
- BytesRefWritable ref = ret.unCheckedGet(i);
+ if (col.isNulled) {
+ ret.set(i, null);
+ } else {
+ BytesRefWritable ref = ret.unCheckedGet(i);
- colAdvanceRow(j, col);
+ colAdvanceRow(j, col);
- if (currentValue.decompressedFlag[j]) {
- ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
- col.rowReadIndex, col.prvLength);
- } else {
- ref.set(currentValue.lazyDecompressCallbackObjs[j],
- col.rowReadIndex, col.prvLength);
+ if (currentValue.decompressedFlag[j]) {
+ ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+ col.rowReadIndex, col.prvLength);
+ } else {
+ ref.set(currentValue.lazyDecompressCallbackObjs[j],
+ col.rowReadIndex, col.prvLength);
+ }
+ col.rowReadIndex += col.prvLength;
}
- col.rowReadIndex += col.prvLength;
}
} else {
// This version of the loop eliminates a condition check and branch
@@ -1714,12 +1734,16 @@ public class RCFile {
SelectedColumn col = selectedColumns[j];
int i = col.colIndex;
- BytesRefWritable ref = ret.unCheckedGet(i);
+ if (col.isNulled) {
+ ret.set(i, null);
+ } else {
+ BytesRefWritable ref = ret.unCheckedGet(i);
- colAdvanceRow(j, col);
- ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
- col.rowReadIndex, col.prvLength);
- col.rowReadIndex += col.prvLength;
+ colAdvanceRow(j, col);
+ ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+ col.rowReadIndex, col.prvLength);
+ col.rowReadIndex += col.prvLength;
+ }
}
}
rowFetched = true;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Fri Apr 26 19:14:49 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io.orc;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -70,13 +71,23 @@ public final class OrcFile {
*/
public static Writer createWriter(FileSystem fs,
Path path,
+ Configuration conf,
ObjectInspector inspector,
long stripeSize,
CompressionKind compress,
int bufferSize,
int rowIndexStride) throws IOException {
return new WriterImpl(fs, path, inspector, stripeSize, compress,
- bufferSize, rowIndexStride);
+ bufferSize, rowIndexStride, getMemoryManager(conf));
}
+ private static MemoryManager memoryManager = null;
+
+ private static synchronized
+ MemoryManager getMemoryManager(Configuration conf) {
+ if (memoryManager == null) {
+ memoryManager = new MemoryManager(conf);
+ }
+ return memoryManager;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Fri Apr 26 19:14:49 2013
@@ -71,8 +71,8 @@ public class OrcOutputFormat extends Fil
public void write(NullWritable nullWritable,
OrcSerdeRow row) throws IOException {
if (writer == null) {
- writer = OrcFile.createWriter(fs, path, row.getInspector(), stripeSize,
- compress, compressionSize, rowIndexStride);
+ writer = OrcFile.createWriter(fs, path, this.conf, row.getInspector(),
+ stripeSize, compress, compressionSize, rowIndexStride);
}
writer.addRow(row.getRow());
}
@@ -81,8 +81,9 @@ public class OrcOutputFormat extends Fil
public void write(Writable row) throws IOException {
OrcSerdeRow serdeRow = (OrcSerdeRow) row;
if (writer == null) {
- writer = OrcFile.createWriter(fs, path, serdeRow.getInspector(),
- stripeSize, compress, compressionSize, rowIndexStride);
+ writer = OrcFile.createWriter(fs, path, this.conf,
+ serdeRow.getInspector(), stripeSize, compress, compressionSize,
+ rowIndexStride);
}
writer.addRow(serdeRow.getRow());
}
@@ -101,8 +102,8 @@ public class OrcOutputFormat extends Fil
ObjectInspector inspector = ObjectInspectorFactory.
getStandardStructObjectInspector(new ArrayList<String>(),
new ArrayList<ObjectInspector>());
- writer = OrcFile.createWriter(fs, path, inspector, stripeSize,
- compress, compressionSize, rowIndexStride);
+ writer = OrcFile.createWriter(fs, path, this.conf, inspector,
+ stripeSize, compress, compressionSize, rowIndexStride);
}
writer.close();
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java Fri Apr 26 19:14:49 2013
@@ -17,6 +17,13 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -31,16 +38,9 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
import org.apache.hadoop.io.Writable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
final class OrcStruct implements Writable {
- private final Object[] fields;
+ private Object[] fields;
OrcStruct(int children) {
fields = new Object[children];
@@ -54,6 +54,14 @@ final class OrcStruct implements Writabl
fields[fieldIndex] = value;
}
+ public int getNumFields() {
+ return fields.length;
+ }
+
+ public void setNumFields(int numFields) {
+ fields = new Object[numFields];
+ }
+
@Override
public void write(DataOutput dataOutput) throws IOException {
throw new UnsupportedOperationException("write unsupported");
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Fri Apr 26 19:14:49 2013
@@ -733,8 +733,9 @@ class RecordReaderImpl implements Record
} else {
length = dictionaryBuffer.size() - offset;
}
- // If the column is just empty strings, the size will be zero, so the buffer will be null,
- // in that case just return result as it will default to empty
+ // If the column is just empty strings, the size will be zero,
+ // so the buffer will be null, in that case just return result
+ // as it will default to empty
if (dictionaryBuffer != null) {
dictionaryBuffer.setText(result, offset, length);
} else {
@@ -788,6 +789,13 @@ class RecordReaderImpl implements Record
result = new OrcStruct(fields.length);
} else {
result = (OrcStruct) previous;
+
+ // If the input format was initialized with a file with a
+ // different number of fields, the number of fields needs to
+ // be updated to the correct number
+ if (result.getNumFields() != fields.length) {
+ result.setNumFields(fields.length);
+ }
}
for(int i=0; i < fields.length; ++i) {
if (fields[i] != null) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Fri Apr 26 19:14:49 2013
@@ -18,8 +18,15 @@
package org.apache.hadoop.hive.ql.io.orc;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,14 +51,8 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import org.apache.hadoop.io.BytesWritable;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
/**
* An ORC file writer. The file is divided into stripes, which is the natural
@@ -62,7 +63,7 @@ import java.util.TreeMap;
* sub-types. Each of the TreeWriters writes the column's data as a set of
* streams.
*/
-class WriterImpl implements Writer {
+class WriterImpl implements Writer, MemoryManager.Callback {
private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private static final int MIN_ROW_INDEX_STRIDE = 1000;
@@ -97,6 +98,7 @@ class WriterImpl implements Writer {
private final OrcProto.RowIndex.Builder rowIndex =
OrcProto.RowIndex.newBuilder();
private final boolean buildIndex;
+ private final MemoryManager memoryManager;
WriterImpl(FileSystem fs,
Path path,
@@ -104,13 +106,15 @@ class WriterImpl implements Writer {
long stripeSize,
CompressionKind compress,
int bufferSize,
- int rowIndexStride) throws IOException {
+ int rowIndexStride,
+ MemoryManager memoryManager) throws IOException {
this.fs = fs;
this.path = path;
this.stripeSize = stripeSize;
this.compress = compress;
this.bufferSize = bufferSize;
this.rowIndexStride = rowIndexStride;
+ this.memoryManager = memoryManager;
buildIndex = rowIndexStride > 0;
codec = createCodec(compress);
treeWriter = createTreeWriter(inspector, streamFactory, false);
@@ -118,6 +122,8 @@ class WriterImpl implements Writer {
throw new IllegalArgumentException("Row stride must be at least " +
MIN_ROW_INDEX_STRIDE);
}
+ // ensure that we are able to handle callbacks before we register ourselves
+ memoryManager.addWriter(path, stripeSize, this);
}
static CompressionCodec createCodec(CompressionKind kind) {
@@ -147,6 +153,13 @@ class WriterImpl implements Writer {
}
}
+ @Override
+ public void checkMemory(double newScale) throws IOException {
+ if (estimateStripeSize() > Math.round(stripeSize * newScale)) {
+ flushStripe();
+ }
+ }
+
/**
* This class is used to hold the contents of streams as they are buffered.
* The TreeWriters write to the outStream and the codec compresses the
@@ -734,19 +747,8 @@ class WriterImpl implements Writer {
int length = rows.size();
int rowIndexEntry = 0;
OrcProto.RowIndex.Builder rowIndex = getRowIndex();
- // need to build the first index entry out here, to handle the case of
- // not having any values.
- if (buildIndex) {
- while (0 == rowIndexValueCount.get(rowIndexEntry) &&
- rowIndexEntry < savedRowIndex.size()) {
- OrcProto.RowIndexEntry.Builder base =
- savedRowIndex.get(rowIndexEntry++).toBuilder();
- rowOutput.getPosition(new RowIndexPositionRecorder(base));
- rowIndex.addEntry(base.build());
- }
- }
// write the values translated into the dump order.
- for(int i = 0; i < length; ++i) {
+ for(int i = 0; i <= length; ++i) {
// now that we are writing out the row values, we can finalize the
// row index
if (buildIndex) {
@@ -758,7 +760,9 @@ class WriterImpl implements Writer {
rowIndex.addEntry(base.build());
}
}
- rowOutput.write(dumpOrder[rows.get(i)]);
+ if (i != length) {
+ rowOutput.write(dumpOrder[rows.get(i)]);
+ }
}
// we need to build the rowindex before calling super, since it
// writes it out.
@@ -1453,8 +1457,8 @@ class WriterImpl implements Writer {
}
}
// once every 1000 rows, check the size to see if we should spill
- if (rowsInStripe % 1000 == 0 && estimateStripeSize() > stripeSize) {
- flushStripe();
+ if (rowsInStripe % 1000 == 0) {
+ checkMemory(memoryManager.getAllocationScale());
}
}
@@ -1464,5 +1468,6 @@ class WriterImpl implements Writer {
int footerLength = writeFooter(rawWriter.getPos());
rawWriter.writeByte(writePostScript(footerLength));
rawWriter.close();
+ memoryManager.removeWriter(path);
}
}