You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/31 00:22:46 UTC
svn commit: r1508669 [4/39] - in /hive/branches/vectorization: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
contrib/src/test/results/clientpositive/ data/files/ eclipse-templates/
hcatalog/build-support/ant/ hcatalog/core/src/main/java/org/apa...
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Jul 30 22:22:35 2013
@@ -22,6 +22,7 @@ import java.beans.DefaultPersistenceDele
import java.beans.Encoder;
import java.beans.ExceptionListener;
import java.beans.Expression;
+import java.beans.PersistenceDelegate;
import java.beans.Statement;
import java.beans.XMLDecoder;
import java.beans.XMLEncoder;
@@ -50,12 +51,14 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLTransientException;
+import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -111,6 +114,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -118,20 +122,24 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.Adjacency;
import org.apache.hadoop.hive.ql.plan.api.Graph;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
@@ -166,6 +174,8 @@ public final class Utilities {
*/
public static String HADOOP_LOCAL_FS = "file:///";
+ public static String MAP_PLAN_NAME = "map.xml";
+ public static String REDUCE_PLAN_NAME = "reduce.xml";
/**
* ReduceField:
@@ -188,56 +198,85 @@ public final class Utilities {
// prevent instantiation
}
- private static Map<String, MapredWork> gWorkMap = Collections
- .synchronizedMap(new HashMap<String, MapredWork>());
+ private static Map<Path, BaseWork> gWorkMap = Collections
+ .synchronizedMap(new HashMap<Path, BaseWork>());
private static final Log LOG = LogFactory.getLog(Utilities.class.getName());
- public static void clearMapRedWork(Configuration job) {
+ public static void clearWork(Configuration conf) {
+ Path mapPath = getPlanPath(conf, MAP_PLAN_NAME);
+ Path reducePath = getPlanPath(conf, REDUCE_PLAN_NAME);
+
+ // if the plan path hasn't been initialized just return, nothing to clean.
+ if (mapPath == null || reducePath == null) {
+ return;
+ }
+
try {
- Path planPath = new Path(HiveConf.getVar(job, HiveConf.ConfVars.PLAN));
- FileSystem fs = planPath.getFileSystem(job);
- if (fs.exists(planPath)) {
- try {
- fs.delete(planPath, true);
- } catch (IOException e) {
- e.printStackTrace();
- }
+ FileSystem fs = mapPath.getFileSystem(conf);
+ if (fs.exists(mapPath)) {
+ fs.delete(mapPath, true);
}
+ if (fs.exists(reducePath)) {
+ fs.delete(reducePath, true);
+ }
+
} catch (Exception e) {
+ LOG.warn("Failed to clean-up tmp directories.", e);
} finally {
// where a single process works with multiple plans - we must clear
// the cache before working with the next plan.
- String jobID = getHiveJobID(job);
- if (jobID != null) {
- gWorkMap.remove(jobID);
+ if (mapPath != null) {
+ gWorkMap.remove(mapPath);
+ }
+ if (reducePath != null) {
+ gWorkMap.remove(reducePath);
}
}
}
- public static MapredWork getMapRedWork(Configuration job) {
- MapredWork gWork = null;
+ public static MapredWork getMapRedWork(Configuration conf) {
+ MapredWork w = new MapredWork();
+ w.setMapWork(getMapWork(conf));
+ w.setReduceWork(getReduceWork(conf));
+ return w;
+ }
+
+ public static MapWork getMapWork(Configuration conf) {
+ return (MapWork) getBaseWork(conf, MAP_PLAN_NAME);
+ }
+
+ public static ReduceWork getReduceWork(Configuration conf) {
+ return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
+ }
+
+ public static BaseWork getBaseWork(Configuration conf, String name) {
+ BaseWork gWork = null;
+ Path path = null;
try {
- String jobID = getHiveJobID(job);
- assert jobID != null;
- gWork = gWorkMap.get(jobID);
+ path = getPlanPath(conf, name);
+ assert path != null;
+ gWork = gWorkMap.get(path);
if (gWork == null) {
- String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(job);
- String path;
+ String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf);
+ Path localPath;
if (jtConf.equals("local")) {
- String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
- path = new Path(planPath).toUri().getPath();
+ localPath = path;
} else {
- path = "HIVE_PLAN" + jobID;
+ localPath = new Path(name);
}
- InputStream in = new FileInputStream(path);
- MapredWork ret = deserializeMapRedWork(in, job);
+ InputStream in = new FileInputStream(localPath.toUri().getPath());
+ BaseWork ret = deserializeObject(in);
gWork = ret;
- gWork.initialize();
- gWorkMap.put(jobID, gWork);
+ gWorkMap.put(path, gWork);
}
- return (gWork);
+ return gWork;
+ } catch (FileNotFoundException fnf) {
+ // happens. e.g.: no reduce work.
+ LOG.debug("No plan file found: "+path);
+ return null;
} catch (Exception e) {
e.printStackTrace();
+ LOG.error("Failed to load plan: "+path, e);
throw new RuntimeException(e);
}
}
@@ -376,47 +415,115 @@ public final class Utilities {
}
- public static void setMapRedWork(Configuration job, MapredWork w, String hiveScratchDir) {
+ /**
+ * DatePersistenceDelegate. Needed to serialize java.util.Date
+ * since it is not serialization friendly.
+ * Also works for java.sql.Date since it derives from java.util.Date.
+ */
+ public static class DatePersistenceDelegate extends PersistenceDelegate {
+
+ @Override
+ protected Expression instantiate(Object oldInstance, Encoder out) {
+ Date dateVal = (Date)oldInstance;
+ Object[] args = { dateVal.getTime() };
+ return new Expression(dateVal, dateVal.getClass(), "new", args);
+ }
+
+ protected boolean mutatesTo(Object oldInstance, Object newInstance) {
+ if (oldInstance == null || newInstance == null) {
+ return false;
+ }
+ return oldInstance.getClass() == newInstance.getClass();
+ }
+ }
+
+ /**
+ * TimestampPersistenceDelegate. Needed to serialize java.sql.Timestamp since
+ * it is not serialization friendly.
+ */
+ public static class TimestampPersistenceDelegate extends DatePersistenceDelegate {
+ protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
+ Timestamp ts = (Timestamp)oldInstance;
+ Object[] args = { ts.getNanos() };
+ Statement stmt = new Statement(oldInstance, "setNanos", args);
+ out.writeStatement(stmt);
+ }
+ }
+
+ public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) {
+ setMapWork(conf, w.getMapWork(), hiveScratchDir);
+ if (w.getReduceWork() != null) {
+ setReduceWork(conf, w.getReduceWork(), hiveScratchDir);
+ }
+ }
+
+ public static void setMapWork(Configuration conf, MapWork w, String hiveScratchDir) {
+ setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME);
+ }
+
+ public static void setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir) {
+ setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME);
+ }
+
+ private static void setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name) {
try {
+ setPlanPath(conf, hiveScratchDir);
- // this is the unique job ID, which is kept in JobConf as part of the plan file name
- String jobID = UUID.randomUUID().toString();
- Path planPath = new Path(hiveScratchDir, jobID);
- HiveConf.setVar(job, HiveConf.ConfVars.PLAN, planPath.toUri().toString());
+ Path planPath = getPlanPath(conf, name);
- // use the default file system of the job
- FileSystem fs = planPath.getFileSystem(job);
+ // use the default file system of the conf
+ FileSystem fs = planPath.getFileSystem(conf);
FSDataOutputStream out = fs.create(planPath);
- serializeMapRedWork(w, out);
+ serializeObject(w, out);
// Serialize the plan to the default hdfs instance
// Except for hadoop local mode execution where we should be
// able to get the plan directly from the cache
- if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
+ if (!ShimLoader.getHadoopShims().isLocalMode(conf)) {
// Set up distributed cache
- DistributedCache.createSymlink(job);
- String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN" + jobID;
- DistributedCache.addCacheFile(new URI(uriWithLink), job);
+ if (!DistributedCache.getSymlink(conf)) {
+ DistributedCache.createSymlink(conf);
+ }
+ String uriWithLink = planPath.toUri().toString() + "#" + name;
+ DistributedCache.addCacheFile(new URI(uriWithLink), conf);
// set replication of the plan file to a high number. we use the same
// replication factor as used by the hadoop jobclient for job.xml etc.
- short replication = (short) job.getInt("mapred.submit.replication", 10);
+ short replication = (short) conf.getInt("mapred.submit.replication", 10);
fs.setReplication(planPath, replication);
}
// Cache the plan in this process
- w.initialize();
- gWorkMap.put(jobID, w);
+ gWorkMap.put(planPath, w);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
- public static String getHiveJobID(Configuration job) {
- String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
- if (planPath != null && !planPath.isEmpty()) {
- return (new Path(planPath)).getName();
+ private static Path getPlanPath(Configuration conf, String name) {
+ Path planPath = getPlanPath(conf);
+ if (planPath == null) {
+ return null;
+ }
+ return new Path(planPath, name);
+ }
+
+ private static void setPlanPath(Configuration conf, String hiveScratchDir) throws IOException {
+ if (getPlanPath(conf) == null) {
+ // this is the unique conf ID, which is kept in JobConf as part of the plan file name
+ String jobID = UUID.randomUUID().toString();
+ Path planPath = new Path(hiveScratchDir, jobID);
+ FileSystem fs = planPath.getFileSystem(conf);
+ fs.mkdirs(planPath);
+ HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, planPath.toUri().toString());
+ }
+ }
+
+ private static Path getPlanPath(Configuration conf) {
+ String plan = HiveConf.getVar(conf, HiveConf.ConfVars.PLAN);
+ if (plan != null && !plan.isEmpty()) {
+ return new Path(plan);
}
return null;
}
@@ -424,6 +531,8 @@ public final class Utilities {
public static String serializeExpression(ExprNodeDesc expr) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
XMLEncoder encoder = new XMLEncoder(baos);
+ encoder.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
+ encoder.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
try {
encoder.writeObject(expr);
} finally {
@@ -455,26 +564,6 @@ public final class Utilities {
}
}
- /**
- * Serialize a single Task.
- */
- public static void serializeTasks(Task<? extends Serializable> t, OutputStream out) {
- XMLEncoder e = null;
- try {
- e = new XMLEncoder(out);
- // workaround for java 1.5
- e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
- e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
- e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
-
- e.writeObject(t);
- } finally {
- if (null != e) {
- e.close();
- }
- }
- }
-
public static class CollectionPersistenceDelegate extends DefaultPersistenceDelegate {
@Override
protected Expression instantiate(Object oldInstance, Encoder out) {
@@ -491,102 +580,40 @@ public final class Utilities {
}
/**
- * Serialize the whole query plan.
+ * Serialize the object. This helper function mainly makes sure that enums,
+ * counters, etc are handled properly.
*/
- public static void serializeQueryPlan(QueryPlan plan, OutputStream out) {
+ public static void serializeObject(Object plan, OutputStream out) {
XMLEncoder e = new XMLEncoder(out);
e.setExceptionListener(new ExceptionListener() {
public void exceptionThrown(Exception e) {
LOG.warn(org.apache.hadoop.util.StringUtils.stringifyException(e));
- throw new RuntimeException("Cannot serialize the query plan", e);
+ throw new RuntimeException("Cannot serialize object", e);
}
});
// workaround for java 1.5
e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
e.setPersistenceDelegate(Operator.ProgressCounter.class, new EnumDelegate());
+ e.setPersistenceDelegate(java.sql.Date.class, new DatePersistenceDelegate());
+ e.setPersistenceDelegate(Timestamp.class, new TimestampPersistenceDelegate());
- e.setPersistenceDelegate(org.datanucleus.sco.backed.Map.class, new MapDelegate());
- e.setPersistenceDelegate(org.datanucleus.sco.backed.List.class, new ListDelegate());
+ e.setPersistenceDelegate(org.datanucleus.store.types.backed.Map.class, new MapDelegate());
+ e.setPersistenceDelegate(org.datanucleus.store.types.backed.List.class, new ListDelegate());
e.writeObject(plan);
e.close();
}
/**
- * Deserialize the whole query plan.
- */
- public static QueryPlan deserializeQueryPlan(InputStream in, Configuration conf) {
- XMLDecoder d = null;
- try {
- d = new XMLDecoder(in, null, null);
- QueryPlan ret = (QueryPlan) d.readObject();
- return (ret);
- } finally {
- if (null != d) {
- d.close();
- }
- }
- }
-
- /**
- * Serialize the mapredWork object to an output stream. DO NOT use this to write to standard
- * output since it closes the output stream. DO USE mapredWork.toXML() instead.
- */
- public static void serializeMapRedWork(MapredWork w, OutputStream out) {
- XMLEncoder e = null;
- try {
- e = new XMLEncoder(out);
- // workaround for java 1.5
- e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
- e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
- e.writeObject(w);
- } finally {
- if (null != e) {
- e.close();
- }
- }
-
- }
-
- public static MapredWork deserializeMapRedWork(InputStream in, Configuration conf) {
- XMLDecoder d = null;
- try {
- d = new XMLDecoder(in, null, null);
- MapredWork ret = (MapredWork) d.readObject();
- return (ret);
- } finally {
- if (null != d) {
- d.close();
- }
- }
- }
-
- /**
- * Serialize the mapredLocalWork object to an output stream. DO NOT use this to write to standard
- * output since it closes the output stream. DO USE mapredWork.toXML() instead.
+ * De-serialize an object. This helper function mainly makes sure that enums,
+ * counters, etc are handled properly.
*/
- public static void serializeMapRedLocalWork(MapredLocalWork w, OutputStream out) {
- XMLEncoder e = null;
- try {
- e = new XMLEncoder(out);
- // workaround for java 1.5
- e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
- e.setPersistenceDelegate(GroupByDesc.Mode.class, new EnumDelegate());
- e.writeObject(w);
- } finally {
- if (null != e) {
- e.close();
- }
- }
- }
-
- public static MapredLocalWork deserializeMapRedLocalWork(InputStream in, Configuration conf) {
+ public static <T> T deserializeObject(InputStream in) {
XMLDecoder d = null;
try {
d = new XMLDecoder(in, null, null);
- MapredLocalWork ret = (MapredLocalWork) d.readObject();
- return (ret);
+ return (T) d.readObject();
} finally {
if (null != d) {
d.close();
@@ -1765,7 +1792,7 @@ public final class Utilities {
* @return the summary of all the input paths.
* @throws IOException
*/
- public static ContentSummary getInputSummary(Context ctx, MapredWork work, PathFilter filter)
+ public static ContentSummary getInputSummary(Context ctx, MapWork work, PathFilter filter)
throws IOException {
long[] summary = {0, 0, 0};
@@ -2129,17 +2156,45 @@ public final class Utilities {
/**
* Check if a function can be pushed down to JDO.
- * Now only {=, AND, OR} are supported.
+ * Now only {compares, AND, OR} are supported.
* @param func a generic function.
* @return true if this function can be pushed down to JDO filter.
*/
private static boolean supportedJDOFuncs(GenericUDF func) {
+ // TODO: we might also want to add "not" and "between" here in future.
+ // TODO: change to GenericUDFBaseCompare once DN is upgraded
+ // (see HIVE-2609 - in DN 2.0, substrings do not work in MySQL).
return func instanceof GenericUDFOPEqual ||
+ func instanceof GenericUDFOPNotEqual ||
func instanceof GenericUDFOPAnd ||
func instanceof GenericUDFOPOr;
}
/**
+ * Check if a function can be pushed down to JDO for integral types.
+ * Only {=, !=} are supported. lt/gt/etc. to be dealt with in HIVE-4888.
+ * @param func a generic function.
+ * @return true iff this function can be pushed down to JDO filter for integral types.
+ */
+ private static boolean doesJDOFuncSupportIntegral(GenericUDF func) {
+ // AND, OR etc. don't need to be specified here.
+ return func instanceof GenericUDFOPEqual ||
+ func instanceof GenericUDFOPNotEqual;
+ }
+
+ /**
+ * @param type type
+ * @param constant The constant, if any.
+ * @return true iff type is an integral type.
+ */
+ private static boolean isIntegralType(String type) {
+ return type.equals(serdeConstants.TINYINT_TYPE_NAME) ||
+ type.equals(serdeConstants.SMALLINT_TYPE_NAME) ||
+ type.equals(serdeConstants.INT_TYPE_NAME) ||
+ type.equals(serdeConstants.BIGINT_TYPE_NAME);
+ }
+
+ /**
* Check if the partition pruning expression can be pushed down to JDO filtering.
* The partition expression contains only partition columns.
* The criteria that an expression can be pushed down are that:
@@ -2149,32 +2204,47 @@ public final class Utilities {
* restriction by the current JDO filtering implementation.
* @param tab The table that contains the partition columns.
* @param expr the partition pruning expression
+ * @param parent parent UDF of expr if parent exists and contains a UDF; otherwise null.
* @return null if the partition pruning expression can be pushed down to JDO filtering.
*/
- public static String checkJDOPushDown(Table tab, ExprNodeDesc expr) {
- if (expr instanceof ExprNodeConstantDesc) {
- // JDO filter now only support String typed literal -- see Filter.g and ExpressionTree.java
+ public static String checkJDOPushDown(
+ Table tab, ExprNodeDesc expr, GenericUDF parent) {
+ boolean isConst = expr instanceof ExprNodeConstantDesc;
+ boolean isCol = !isConst && (expr instanceof ExprNodeColumnDesc);
+ boolean isIntegralSupported = (parent != null) && (isConst || isCol)
+ && doesJDOFuncSupportIntegral(parent);
+
+ // JDO filter now only support String typed literals, as well as integers
+ // for some operators; see Filter.g and ExpressionTree.java.
+ if (isConst) {
Object value = ((ExprNodeConstantDesc)expr).getValue();
if (value instanceof String) {
return null;
}
- return "Constant " + value + " is not string type";
- } else if (expr instanceof ExprNodeColumnDesc) {
- // JDO filter now only support String typed literal -- see Filter.g and ExpressionTree.java
+ if (isIntegralSupported && isIntegralType(expr.getTypeInfo().getTypeName())) {
+ return null;
+ }
+ return "Constant " + value + " is not string "
+ + (isIntegralSupported ? "or integral ": "") + "type: " + expr.getTypeInfo().getTypeName();
+ } else if (isCol) {
TypeInfo type = expr.getTypeInfo();
- if (type.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
+ if (type.getTypeName().equals(serdeConstants.STRING_TYPE_NAME)
+ || (isIntegralSupported && isIntegralType(type.getTypeName()))) {
String colName = ((ExprNodeColumnDesc)expr).getColumn();
for (FieldSchema fs: tab.getPartCols()) {
if (fs.getName().equals(colName)) {
- if (fs.getType().equals(serdeConstants.STRING_TYPE_NAME)) {
+ if (fs.getType().equals(serdeConstants.STRING_TYPE_NAME)
+ || (isIntegralSupported && isIntegralType(fs.getType()))) {
return null;
}
- return "Partition column " + fs.getName() + " is not string type";
+ return "Partition column " + fs.getName() + " is not string "
+ + (isIntegralSupported ? "or integral ": "") + "type: " + fs.getType();
}
}
assert(false); // cannot find the partition column!
} else {
- return "Column " + expr.getExprString() + " is not string type";
+ return "Column " + expr.getExprString() + " is not string "
+ + (isIntegralSupported ? "or integral ": "") + "type: " + type.getTypeName();
}
} else if (expr instanceof ExprNodeGenericFuncDesc) {
ExprNodeGenericFuncDesc funcDesc = (ExprNodeGenericFuncDesc) expr;
@@ -2188,7 +2258,7 @@ public final class Utilities {
if (!(child instanceof ExprNodeConstantDesc)) {
allChildrenConstant = false;
}
- String message = checkJDOPushDown(tab, child);
+ String message = checkJDOPushDown(tab, child, func);
if (message != null) {
return message;
}
@@ -2226,7 +2296,7 @@ public final class Utilities {
try {
MapredWork mapredWork = ((MapRedTask) task).getWork();
Set<Class<? extends InputFormat>> reworkInputFormats = new HashSet<Class<? extends InputFormat>>();
- for (PartitionDesc part : mapredWork.getPathToPartitionInfo().values()) {
+ for (PartitionDesc part : mapredWork.getMapWork().getPathToPartitionInfo().values()) {
Class<? extends InputFormat> inputFormatCls = part
.getInputFileFormatClass();
if (ReworkMapredInputFormat.class.isAssignableFrom(inputFormatCls)) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Jul 30 22:22:35 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.ErrorMs
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -64,7 +65,6 @@ import org.apache.hadoop.hive.ql.exec.Pa
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.vector.VectorExecMapper;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
@@ -78,10 +78,12 @@ import org.apache.hadoop.hive.ql.io.OneN
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -107,7 +109,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.varia.NullAppender;
/**
- * ExecDriver is the central class in co-ordinating execution of any map-reduce task.
+ * ExecDriver is the central class in co-ordinating execution of any map-reduce task.
* It's main responsabilities are:
*
* - Converting the plan (MapredWork) into a MR Job (JobConf)
@@ -201,13 +203,13 @@ public class ExecDriver extends Task<Map
* @return true if fatal errors happened during job execution, false otherwise.
*/
public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
- for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+ for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
if (op.checkFatalErrors(ctrs, errMsg)) {
return true;
}
}
- if (work.getReducer() != null) {
- if (work.getReducer().checkFatalErrors(ctrs, errMsg)) {
+ if (work.getReduceWork() != null) {
+ if (work.getReduceWork().getReducer().checkFatalErrors(ctrs, errMsg)) {
return true;
}
}
@@ -216,18 +218,18 @@ public class ExecDriver extends Task<Map
protected void createTmpDirs() throws IOException {
// fix up outputs
- Map<String, ArrayList<String>> pa = work.getPathToAliases();
+ Map<String, ArrayList<String>> pa = work.getMapWork().getPathToAliases();
if (pa != null) {
List<Operator<? extends OperatorDesc>> opList =
new ArrayList<Operator<? extends OperatorDesc>>();
- if (work.getReducer() != null) {
- opList.add(work.getReducer());
+ if (work.getReduceWork() != null) {
+ opList.add(work.getReduceWork().getReducer());
}
for (List<String> ls : pa.values()) {
for (String a : ls) {
- opList.add(work.getAliasToWork().get(a));
+ opList.add(work.getMapWork().getAliasToWork().get(a));
while (!opList.isEmpty()) {
Operator<? extends OperatorDesc> op = opList.remove(0);
@@ -256,6 +258,7 @@ public class ExecDriver extends Task<Map
/**
* Execute a query plan using Hadoop.
*/
+ @SuppressWarnings("deprecation")
@Override
public int execute(DriverContext driverContext) {
@@ -264,16 +267,14 @@ public class ExecDriver extends Task<Map
boolean success = true;
- String invalidReason = work.isInvalid();
- if (invalidReason != null) {
- throw new RuntimeException("Plan invalid, Reason: " + invalidReason);
- }
-
Context ctx = driverContext.getCtx();
boolean ctxCreated = false;
String emptyScratchDirStr;
Path emptyScratchDir;
+ MapWork mWork = work.getMapWork();
+ ReduceWork rWork = work.getReduceWork();
+
try {
if (ctx == null) {
ctx = new Context(job);
@@ -322,27 +323,27 @@ public class ExecDriver extends Task<Map
throw new RuntimeException(e.getMessage());
}
- if (work.getNumMapTasks() != null) {
- job.setNumMapTasks(work.getNumMapTasks().intValue());
+ if (mWork.getNumMapTasks() != null) {
+ job.setNumMapTasks(mWork.getNumMapTasks().intValue());
}
- if (work.getMaxSplitSize() != null) {
- HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, work.getMaxSplitSize().longValue());
+ if (mWork.getMaxSplitSize() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, mWork.getMaxSplitSize().longValue());
}
- if (work.getMinSplitSize() != null) {
- HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work.getMinSplitSize().longValue());
+ if (mWork.getMinSplitSize() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, mWork.getMinSplitSize().longValue());
}
- if (work.getMinSplitSizePerNode() != null) {
- HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, work.getMinSplitSizePerNode().longValue());
+ if (mWork.getMinSplitSizePerNode() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, mWork.getMinSplitSizePerNode().longValue());
}
- if (work.getMinSplitSizePerRack() != null) {
- HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, work.getMinSplitSizePerRack().longValue());
+ if (mWork.getMinSplitSizePerRack() != null) {
+ HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, mWork.getMinSplitSizePerRack().longValue());
}
- job.setNumReduceTasks(work.getNumReduceTasks().intValue());
+ job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);
job.setReducerClass(ExecReducer.class);
// set input format information if necessary
@@ -359,7 +360,7 @@ public class ExecDriver extends Task<Map
inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
}
- if (getWork().isUseBucketizedHiveInputFormat()) {
+ if (mWork.isUseBucketizedHiveInputFormat()) {
inpFormat = BucketizedHiveInputFormat.class.getName();
}
@@ -408,11 +409,11 @@ public class ExecDriver extends Task<Map
}
try{
- MapredLocalWork localwork = work.getMapLocalWork();
+ MapredLocalWork localwork = mWork.getMapLocalWork();
if (localwork != null) {
if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
Path localPath = new Path(localwork.getTmpFileURI());
- Path hdfsPath = new Path(work.getTmpHDFSFileURI());
+ Path hdfsPath = new Path(mWork.getTmpHDFSFileURI());
FileSystem hdfs = hdfsPath.getFileSystem(job);
FileSystem localFS = localPath.getFileSystem(job);
@@ -450,17 +451,17 @@ public class ExecDriver extends Task<Map
}
}
work.configureJobConf(job);
- addInputPaths(job, work, emptyScratchDirStr, ctx);
+ addInputPaths(job, mWork, emptyScratchDirStr, ctx);
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
- if (work.getSamplingType() > 0 && work.getNumReduceTasks() > 1) {
+ if (mWork.getSamplingType() > 0 && rWork != null && rWork.getNumReduceTasks() > 1) {
try {
- handleSampling(driverContext, work, job, new HiveConf(conf));
+ handleSampling(driverContext, mWork, job, conf);
job.setPartitionerClass(HiveTotalOrderPartitioner.class);
} catch (Exception e) {
console.printInfo("Not enough sampling data.. Rolling back to single reducer task");
- work.setNumReduceTasks(1);
+ rWork.setNumReduceTasks(1);
job.setNumReduceTasks(1);
}
}
@@ -475,7 +476,7 @@ public class ExecDriver extends Task<Map
// make this client wait if job trcker is not behaving well.
Throttle.checkJobTracker(job, LOG);
- if (work.isGatheringStats()) {
+ if (mWork.isGatheringStats() || (rWork != null && rWork.isGatheringStats())) {
// initialize stats publishing table
StatsPublisher statsPublisher;
String statsImplementationClass = HiveConf.getVar(job, HiveConf.ConfVars.HIVESTATSDBCLASS);
@@ -517,7 +518,7 @@ public class ExecDriver extends Task<Map
success = false;
returnVal = 1;
} finally {
- Utilities.clearMapRedWork(job);
+ Utilities.clearWork(job);
try {
if (ctxCreated) {
ctx.clear();
@@ -538,13 +539,13 @@ public class ExecDriver extends Task<Map
try {
if (rj != null) {
JobCloseFeedBack feedBack = new JobCloseFeedBack();
- if (work.getAliasToWork() != null) {
- for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+ if (mWork.getAliasToWork() != null) {
+ for (Operator<? extends OperatorDesc> op : mWork.getAliasToWork().values()) {
op.jobClose(job, success, feedBack);
}
}
- if (work.getReducer() != null) {
- work.getReducer().jobClose(job, success, feedBack);
+ if (rWork != null) {
+ rWork.getReducer().jobClose(job, success, feedBack);
}
}
} catch (Exception e) {
@@ -562,7 +563,7 @@ public class ExecDriver extends Task<Map
private boolean validateVectorPath() {
LOG.debug("Validating if vectorized execution is applicable");
- MapredWork thePlan = this.getWork();
+ MapWork thePlan = this.getWork().getMapWork();
for (String path : thePlan.getPathToPartitionInfo().keySet()) {
PartitionDesc pd = thePlan.getPathToPartitionInfo().get(path);
@@ -613,16 +614,16 @@ public class ExecDriver extends Task<Map
}
}
- private void handleSampling(DriverContext context, MapredWork work, JobConf job, HiveConf conf)
+ private void handleSampling(DriverContext context, MapWork mWork, JobConf job, HiveConf conf)
throws Exception {
- assert work.getAliasToWork().keySet().size() == 1;
+ assert mWork.getAliasToWork().keySet().size() == 1;
- String alias = work.getAliases().get(0);
- Operator<?> topOp = work.getAliasToWork().get(alias);
- PartitionDesc partDesc = work.getAliasToPartnInfo().get(alias);
+ String alias = mWork.getAliases().get(0);
+ Operator<?> topOp = mWork.getAliasToWork().get(alias);
+ PartitionDesc partDesc = mWork.getAliasToPartnInfo().get(alias);
- ArrayList<String> paths = work.getPaths();
- ArrayList<PartitionDesc> parts = work.getPartitionDescs();
+ ArrayList<String> paths = mWork.getPaths();
+ ArrayList<PartitionDesc> parts = mWork.getPartitionDescs();
Path onePath = new Path(paths.get(0));
String tmpPath = context.getCtx().getExternalTmpFileURI(onePath.toUri());
@@ -632,7 +633,7 @@ public class ExecDriver extends Task<Map
PartitionKeySampler sampler = new PartitionKeySampler();
- if (work.getSamplingType() == MapredWork.SAMPLING_ON_PREV_MR) {
+ if (mWork.getSamplingType() == MapWork.SAMPLING_ON_PREV_MR) {
console.printInfo("Use sampling data created in previous MR");
// merges sampling data from previous MR and make paritition keys for total sort
for (String path : paths) {
@@ -642,7 +643,7 @@ public class ExecDriver extends Task<Map
sampler.addSampleFile(status.getPath(), job);
}
}
- } else if (work.getSamplingType() == MapredWork.SAMPLING_ON_START) {
+ } else if (mWork.getSamplingType() == MapWork.SAMPLING_ON_START) {
console.printInfo("Creating sampling data..");
assert topOp instanceof TableScanOperator;
TableScanOperator ts = (TableScanOperator) topOp;
@@ -666,7 +667,7 @@ public class ExecDriver extends Task<Map
fetcher.clearFetchContext();
}
} else {
- throw new IllegalArgumentException("Invalid sampling type " + work.getSamplingType());
+ throw new IllegalArgumentException("Invalid sampling type " + mWork.getSamplingType());
}
sampler.writePartitionKeys(partitionFile, job);
}
@@ -675,16 +676,17 @@ public class ExecDriver extends Task<Map
* Set hive input format, and input format file if necessary.
*/
protected void setInputAttributes(Configuration conf) {
- if (work.getInputformat() != null) {
- HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, work.getInputformat());
- }
- if (work.getIndexIntermediateFile() != null) {
- conf.set("hive.index.compact.file", work.getIndexIntermediateFile());
- conf.set("hive.index.blockfilter.file", work.getIndexIntermediateFile());
+ MapWork mWork = work.getMapWork();
+ if (mWork.getInputformat() != null) {
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+ }
+ if (mWork.getIndexIntermediateFile() != null) {
+ conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile());
+ conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile());
}
// Intentionally overwrites anything the user may have put here
- conf.setBoolean("hive.input.format.sorted", work.isInputFormatSorted());
+ conf.setBoolean("hive.input.format.sorted", mWork.isInputFormatSorted());
}
public boolean mapStarted() {
@@ -831,12 +833,12 @@ public class ExecDriver extends Task<Map
int ret;
if (localtask) {
memoryMXBean = ManagementFactory.getMemoryMXBean();
- MapredLocalWork plan = Utilities.deserializeMapRedLocalWork(pathData, conf);
+ MapredLocalWork plan = (MapredLocalWork) Utilities.deserializeObject(pathData);
MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
ret = ed.executeFromChildJVM(new DriverContext());
} else {
- MapredWork plan = Utilities.deserializeMapRedWork(pathData, conf);
+ MapredWork plan = (MapredWork) Utilities.deserializeObject(pathData);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
ret = ed.execute(new DriverContext());
}
@@ -897,19 +899,19 @@ public class ExecDriver extends Task<Map
@Override
public Collection<Operator<? extends OperatorDesc>> getTopOperators() {
- return getWork().getAliasToWork().values();
+ return getWork().getMapWork().getAliasToWork().values();
}
@Override
public boolean hasReduce() {
MapredWork w = getWork();
- return w.getReducer() != null;
+ return w.getReduceWork() != null;
}
/**
* Handle a empty/null path for a given alias.
*/
- private static int addInputPath(String path, JobConf job, MapredWork work, String hiveScratchDir,
+ private static int addInputPath(String path, JobConf job, MapWork work, String hiveScratchDir,
int numEmptyPaths, boolean isEmptyPath, String alias) throws Exception {
// either the directory does not exist or it is empty
assert path == null || isEmptyPath;
@@ -993,7 +995,7 @@ public class ExecDriver extends Task<Map
return numEmptyPaths;
}
- public static void addInputPaths(JobConf job, MapredWork work, String hiveScratchDir, Context ctx)
+ public static void addInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
throws Exception {
int numEmptyPaths = 0;
@@ -1076,11 +1078,11 @@ public class ExecDriver extends Task<Map
@Override
public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
- for (Operator<? extends OperatorDesc> op : work.getAliasToWork().values()) {
+ for (Operator<? extends OperatorDesc> op : work.getMapWork().getAliasToWork().values()) {
op.updateCounters(ctrs);
}
- if (work.getReducer() != null) {
- work.getReducer().updateCounters(ctrs);
+ if (work.getReduceWork() != null) {
+ work.getReduceWork().getReducer().updateCounters(ctrs);
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Tue Jul 30 22:22:35 2013
@@ -30,11 +30,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
-import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
@@ -45,10 +45,10 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.StringUtils;
/**
- * ExecMapper is the generic Map class for Hive. Together with ExecReducer it is
+ * ExecMapper is the generic Map class for Hive. Together with ExecReducer it is
* the bridge between the map-reduce framework and the Hive operator pipeline at
* execution time. It's main responsabilities are:
- *
+ *
* - Load and setup the operator pipeline from XML
* - Run the pipeline by transforming key value pairs to records and forwarding them to the operators
* - Stop execution when the "limit" is reached
@@ -96,7 +96,7 @@ public class ExecMapper extends MapReduc
jc = job;
execContext.setJc(jc);
// create map and fetch operators
- MapredWork mrwork = Utilities.getMapRedWork(job);
+ MapWork mrwork = Utilities.getMapWork(job);
mo = new MapOperator();
mo.setConf(mrwork);
// initialize map operator
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Jul 30 22:22:35 2013
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDe;
@@ -112,7 +112,7 @@ public class ExecReducer extends MapRedu
l4j.info("cannot get classpath: " + e.getMessage());
}
jc = job;
- MapredWork gWork = Utilities.getMapRedWork(job);
+ ReduceWork gWork = Utilities.getReduceWork(job);
reducer = gWork.getReducer();
reducer.setParentOperators(null); // clear out any parents as reducer is the
// root
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Tue Jul 30 22:22:35 2013
@@ -40,8 +40,10 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
@@ -101,7 +103,7 @@ public class MapRedTask extends ExecDriv
conf.getBoolVar(HiveConf.ConfVars.LOCALMODEAUTO)) {
if (inputSummary == null) {
- inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
+ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
}
// set the values of totalInputFileSize and totalInputNumFiles, estimating them
@@ -109,7 +111,7 @@ public class MapRedTask extends ExecDriv
estimateInputSize();
// at this point the number of reducers is precisely defined in the plan
- int numReducers = work.getNumReduceTasks();
+ int numReducers = work.getReduceWork() == null ? 0 : work.getReduceWork().getNumReduceTasks();
if (LOG.isDebugEnabled()) {
LOG.debug("Task: " + getId() + ", Summary: " +
@@ -177,7 +179,7 @@ public class MapRedTask extends ExecDriv
OutputStream out = FileSystem.getLocal(conf).create(planPath);
MapredWork plan = getWork();
LOG.info("Generating plan file " + planPath.toString());
- Utilities.serializeMapRedWork(plan, out);
+ Utilities.serializeObject(plan, out);
String isSilent = "true".equalsIgnoreCase(System
.getProperty("test.silent")) ? "-nolog" : "";
@@ -383,26 +385,26 @@ public class MapRedTask extends ExecDriv
* Set the number of reducers for the mapred work.
*/
private void setNumberOfReducers() throws IOException {
+ ReduceWork rWork = work.getReduceWork();
// this is a temporary hack to fix things that are not fixed in the compiler
- Integer numReducersFromWork = work.getNumReduceTasks();
+ Integer numReducersFromWork = rWork == null ? 0 : rWork.getNumReduceTasks();
- if (work.getReducer() == null) {
+ if (rWork == null) {
console
.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
- work.setNumReduceTasks(Integer.valueOf(0));
} else {
if (numReducersFromWork >= 0) {
console.printInfo("Number of reduce tasks determined at compile time: "
- + work.getNumReduceTasks());
+ + rWork.getNumReduceTasks());
} else if (job.getNumReduceTasks() > 0) {
int reducers = job.getNumReduceTasks();
- work.setNumReduceTasks(reducers);
+ rWork.setNumReduceTasks(reducers);
console
.printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
+ reducers);
} else {
int reducers = estimateNumberOfReducers();
- work.setNumReduceTasks(reducers);
+ rWork.setNumReduceTasks(reducers);
console
.printInfo("Number of reduce tasks not specified. Estimated from input data size: "
+ reducers);
@@ -437,7 +439,7 @@ public class MapRedTask extends ExecDriv
if(inputSummary == null) {
// compute the summary and stash it away
- inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work, null);
+ inputSummary = Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
}
// if all inputs are sampled, we should shrink the size of reducers accordingly.
@@ -459,7 +461,7 @@ public class MapRedTask extends ExecDriv
// and the user has configured Hive to do this, make sure the number of reducers is a
// power of two
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
- work.isFinalMapRed() && !work.getBucketedColsByDirectory().isEmpty()) {
+ work.isFinalMapRed() && !work.getMapWork().getBucketedColsByDirectory().isEmpty()) {
int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
int reducersPowerTwo = (int)Math.pow(2, reducersLog);
@@ -497,11 +499,13 @@ public class MapRedTask extends ExecDriv
return;
}
+ MapWork mWork = work.getMapWork();
+
// Initialize the values to be those taken from the input summary
totalInputFileSize = inputSummary.getLength();
totalInputNumFiles = inputSummary.getFileCount();
- if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+ if (mWork.getNameToSplitSample() == null || mWork.getNameToSplitSample().isEmpty()) {
// If percentage block sampling wasn't used, we don't need to do any estimation
inputSizeEstimated = true;
return;
@@ -510,10 +514,10 @@ public class MapRedTask extends ExecDriv
// if all inputs are sampled, we should shrink the size of the input accordingly
double highestSamplePercentage = 0;
boolean allSample = false;
- for (String alias : work.getAliasToWork().keySet()) {
- if (work.getNameToSplitSample().containsKey(alias)) {
+ for (String alias : mWork.getAliasToWork().keySet()) {
+ if (mWork.getNameToSplitSample().containsKey(alias)) {
allSample = true;
- Double rate = work.getNameToSplitSample().get(alias).getPercent();
+ Double rate = mWork.getNameToSplitSample().get(alias).getPercent();
if (rate != null && rate > highestSamplePercentage) {
highestSamplePercentage = rate;
}
@@ -580,7 +584,7 @@ public class MapRedTask extends ExecDriv
@Override
public Operator<? extends OperatorDesc> getReducer() {
- return getWork().getReducer();
+ return getWork().getReduceWork() == null ? null : getWork().getReduceWork().getReducer();
}
@Override
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Tue Jul 30 22:22:35 2013
@@ -141,7 +141,7 @@ public class MapredLocalTask extends Tas
OutputStream out = FileSystem.getLocal(conf).create(planPath);
MapredLocalWork plan = getWork();
LOG.info("Generating plan file " + planPath.toString());
- Utilities.serializeMapRedLocalWork(plan, out);
+ Utilities.serializeObject(plan, out);
String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent")) ? "-nolog" : "";
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java Tue Jul 30 22:22:35 2013
@@ -28,10 +28,10 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -93,7 +93,7 @@ public class VectorExecMapper extends Ma
mo.setChildren(job);
l4j.info(mo.dump(0));
// initialize map local work
- localWork = mrwork.getMapLocalWork();
+ localWork = mrwork.getMapWork().getMapLocalWork();
execContext.setLocalWork(localWork);
mo.setExecContext(execContext);
@@ -205,7 +205,7 @@ public class VectorExecMapper extends Ma
}
if (fetchOperators != null) {
- MapredLocalWork localWork = mo.getConf().getMapLocalWork();
+ MapredLocalWork localWork = mo.getConf().getMapWork().getMapLocalWork();
for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
Operator<? extends OperatorDesc> forwardOp = localWork
.getAliasToWork().get(entry.getKey());
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java Tue Jul 30 22:22:35 2013
@@ -33,8 +33,8 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -242,7 +242,7 @@ public class VectorMapOperator extends O
throws HiveException,
ClassNotFoundException, InstantiationException, IllegalAccessException,
SerDeException {
- PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
+ PartitionDesc pd = conf.getMapWork().getPathToPartitionInfo().get(onefile);
LinkedHashMap<String, String> partSpec = pd.getPartSpec();
// Use tblProps in case of unpartitioned tables
Properties partProps =
@@ -337,8 +337,8 @@ public class VectorMapOperator extends O
Set<TableDesc> identityConverterTableDesc = new HashSet<TableDesc>();
try
{
- for (String onefile : conf.getPathToAliases().keySet()) {
- PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
+ for (String onefile : conf.getMapWork().getPathToAliases().keySet()) {
+ PartitionDesc pd = conf.getMapWork().getPathToPartitionInfo().get(onefile);
TableDesc tableDesc = pd.getTableDesc();
Properties tblProps = tableDesc.getProperties();
// If the partition does not exist, use table properties
@@ -416,7 +416,7 @@ public class VectorMapOperator extends O
new HashMap<String, Operator<? extends OperatorDesc>>();
try {
- for (String onefile : conf.getPathToAliases().keySet()) {
+ for (String onefile : conf.getMapWork().getPathToAliases().keySet()) {
MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile, convertedOI);
//Create columnMap
Map<String, Integer> columnMap = new HashMap<String, Integer>();
@@ -433,12 +433,12 @@ public class VectorMapOperator extends O
}
Path onepath = new Path(new Path(onefile).toUri().getPath());
- List<String> aliases = conf.getPathToAliases().get(onefile);
+ List<String> aliases = conf.getMapWork().getPathToAliases().get(onefile);
vectorizationContext = new VectorizationContext(columnMap, columnCount);
for (String onealias : aliases) {
- Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(
+ Operator<? extends OperatorDesc> op = conf.getMapWork().getAliasToWork().get(
onealias);
LOG.info("Adding alias " + onealias + " to work list for file "
+ onefile);
@@ -640,14 +640,14 @@ public class VectorMapOperator extends O
Path fpath = new Path((new Path(this.getExecContext().getCurrentInputFile()))
.toUri().getPath());
- for (String onefile : conf.getPathToAliases().keySet()) {
+ for (String onefile : conf.getMapWork().getPathToAliases().keySet()) {
Path onepath = new Path(new Path(onefile).toUri().getPath());
// check for the operators who will process rows coming to this Map
// Operator
if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
- String onealias = conf.getPathToAliases().get(onefile).get(0);
+ String onealias = conf.getMapWork().getPathToAliases().get(onefile).get(0);
Operator<? extends OperatorDesc> op =
- conf.getAliasToWork().get(onealias);
+ conf.getMapWork().getAliasToWork().get(onealias);
LOG.info("Processing alias " + onealias + " for file " + onefile);
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Tue Jul 30 22:22:35 2013
@@ -118,7 +118,7 @@ public class VectorizedRowBatchCtx {
IllegalAccessException, HiveException {
Map<String, PartitionDesc> pathToPartitionInfo = Utilities
- .getMapRedWork(hiveConf).getPathToPartitionInfo();
+ .getMapRedWork(hiveConf).getMapWork().getPathToPartitionInfo();
PartitionDesc part = HiveFileFormatUtils
.getPartitionDescFromPathRecursively(pathToPartitionInfo,
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/index/compact/CompactIndexHandler.java Tue Jul 30 22:22:35 2013
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -188,7 +189,7 @@ public class CompactIndexHandler extends
if (pctx.getConf().getBoolVar(ConfVars.HIVE_INDEX_COMPACT_BINARY_SEARCH) && useSorted) {
// For now, only works if the predicate is a single condition
- MapredWork work = null;
+ MapWork work = null;
String originalInputFormat = null;
for (Task task : driver.getPlan().getRootTasks()) {
// The index query should have one and only one map reduce task in the root tasks
@@ -202,7 +203,9 @@ public class CompactIndexHandler extends
work.setInputFormatSorted(false);
break;
}
- work = (MapredWork)task.getWork();
+ if (task.getWork() != null) {
+ work = ((MapredWork)task.getWork()).getMapWork();
+ }
String inputFormat = work.getInputformat();
originalInputFormat = inputFormat;
if (inputFormat == null) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Tue Jul 30 22:22:35 2013
@@ -95,7 +95,7 @@ public class CombineHiveInputFormat<K ex
this.inputSplitShim = inputSplitShim;
if (job != null) {
Map<String, PartitionDesc> pathToPartitionInfo = Utilities
- .getMapRedWork(job).getPathToPartitionInfo();
+ .getMapWork(job).getPathToPartitionInfo();
// extract all the inputFormatClass names for each chunk in the
// CombinedSplit.
@@ -200,7 +200,7 @@ public class CombineHiveInputFormat<K ex
if (inputFormatClassName == null) {
Map<String, PartitionDesc> pathToPartitionInfo = Utilities
- .getMapRedWork(getJob()).getPathToPartitionInfo();
+ .getMapWork(getJob()).getPathToPartitionInfo();
// extract all the inputFormatClass names for each chunk in the
// CombinedSplit.
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Tue Jul 30 22:22:35 2013
@@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -249,10 +249,10 @@ public class HiveInputFormat<K extends W
}
protected Map<String, PartitionDesc> pathToPartitionInfo;
- MapredWork mrwork = null;
+ MapWork mrwork = null;
protected void init(JobConf job) {
- mrwork = Utilities.getMapRedWork(job);
+ mrwork = Utilities.getMapWork(job);
pathToPartitionInfo = mrwork.getPathToPartitionInfo();
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java Tue Jul 30 22:22:35 2013
@@ -37,10 +37,10 @@ import org.apache.hadoop.mapred.TextInpu
public class SymbolicInputFormat implements ReworkMapredInputFormat {
public void rework(HiveConf job, MapredWork work) throws IOException {
- Map<String, PartitionDesc> pathToParts = work.getPathToPartitionInfo();
+ Map<String, PartitionDesc> pathToParts = work.getMapWork().getPathToPartitionInfo();
List<String> toRemovePaths = new ArrayList<String>();
Map<String, PartitionDesc> toAddPathToPart = new HashMap<String, PartitionDesc>();
- Map<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+ Map<String, ArrayList<String>> pathToAliases = work.getMapWork().getPathToAliases();
for (Map.Entry<String, PartitionDesc> pathPartEntry : pathToParts
.entrySet()) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordReader.java Tue Jul 30 22:22:35 2013
@@ -18,6 +18,10 @@
package org.apache.hadoop.hive.ql.io.avro;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
@@ -29,18 +33,17 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.*;
-
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
/**
* RecordReader optimized against Avro GenericRecords that returns to record
@@ -67,7 +70,9 @@ public class AvroGenericRecordReader imp
GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>();
- if(latest != null) gdr.setExpected(latest);
+ if(latest != null) {
+ gdr.setExpected(latest);
+ }
this.reader = new DataFileReader<GenericRecord>(new FsInput(split.getPath(), job), gdr);
this.reader.sync(split.getStart());
@@ -86,11 +91,11 @@ public class AvroGenericRecordReader imp
FileSystem fs = split.getPath().getFileSystem(job);
// Inside of a MR job, we can pull out the actual properties
if(AvroSerdeUtils.insideMRJob(job)) {
- MapredWork mapRedWork = Utilities.getMapRedWork(job);
+ MapWork mapWork = Utilities.getMapWork(job);
// Iterate over the Path -> Partition descriptions to find the partition
// that matches our input split.
- for (Map.Entry<String,PartitionDesc> pathsAndParts: mapRedWork.getPathToPartitionInfo().entrySet()){
+ for (Map.Entry<String,PartitionDesc> pathsAndParts: mapWork.getPathToPartitionInfo().entrySet()){
String partitionPath = pathsAndParts.getKey();
if(pathIsInPartition(split.getPath(), partitionPath)) {
if(LOG.isInfoEnabled()) {
@@ -101,11 +106,15 @@ public class AvroGenericRecordReader imp
Properties props = pathsAndParts.getValue().getProperties();
if(props.containsKey(AvroSerdeUtils.SCHEMA_LITERAL) || props.containsKey(AvroSerdeUtils.SCHEMA_URL)) {
return AvroSerdeUtils.determineSchemaOrThrowException(props);
- } else
+ }
+ else {
return null; // If it's not in this property, it won't be in any others
+ }
}
}
- if(LOG.isInfoEnabled()) LOG.info("Unable to match filesplit " + split + " with a partition.");
+ if(LOG.isInfoEnabled()) {
+ LOG.info("Unable to match filesplit " + split + " with a partition.");
+ }
}
// In "select * from table" situations (non-MR), we can add things to the job
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ColumnStatisticsImpl.java Tue Jul 30 22:22:35 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io.orc;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
@@ -543,6 +544,97 @@ class ColumnStatisticsImpl implements Co
}
}
+ private static final class DateStatisticsImpl extends ColumnStatisticsImpl
+ implements DateColumnStatistics {
+ private DateWritable minimum = null;
+ private DateWritable maximum = null;
+
+ DateStatisticsImpl() {
+ }
+
+ DateStatisticsImpl(OrcProto.ColumnStatistics stats) {
+ super(stats);
+ OrcProto.DateStatistics dateStats = stats.getDateStatistics();
+ // min,max values serialized/deserialized as int (days since epoch)
+ if (dateStats.hasMaximum()) {
+ maximum = new DateWritable(dateStats.getMaximum());
+ }
+ if (dateStats.hasMinimum()) {
+ minimum = new DateWritable(dateStats.getMinimum());
+ }
+ }
+
+ @Override
+ void reset() {
+ super.reset();
+ minimum = null;
+ maximum = null;
+ }
+
+ @Override
+ void updateDate(DateWritable value) {
+ if (minimum == null) {
+ minimum = value;
+ maximum = value;
+ } else if (minimum.compareTo(value) > 0) {
+ minimum = value;
+ } else if (maximum.compareTo(value) < 0) {
+ maximum = value;
+ }
+ }
+
+ @Override
+ void merge(ColumnStatisticsImpl other) {
+ super.merge(other);
+ DateStatisticsImpl dateStats = (DateStatisticsImpl) other;
+ if (minimum == null) {
+ minimum = dateStats.minimum;
+ maximum = dateStats.maximum;
+ } else if (dateStats.minimum != null) {
+ if (minimum.compareTo(dateStats.minimum) > 0) {
+ minimum = dateStats.minimum;
+ } else if (maximum.compareTo(dateStats.maximum) < 0) {
+ maximum = dateStats.maximum;
+ }
+ }
+ }
+
+ @Override
+ OrcProto.ColumnStatistics.Builder serialize() {
+ OrcProto.ColumnStatistics.Builder result = super.serialize();
+ OrcProto.DateStatistics.Builder dateStats =
+ OrcProto.DateStatistics.newBuilder();
+ if (getNumberOfValues() != 0) {
+ dateStats.setMinimum(minimum.getDays());
+ dateStats.setMaximum(maximum.getDays());
+ }
+ result.setDateStatistics(dateStats);
+ return result;
+ }
+
+ @Override
+ public DateWritable getMinimum() {
+ return minimum;
+ }
+
+ @Override
+ public DateWritable getMaximum() {
+ return maximum;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder(super.toString());
+ if (getNumberOfValues() != 0) {
+ buf.append(" min: ");
+ buf.append(minimum);
+ buf.append(" max: ");
+ buf.append(maximum);
+ }
+ return buf.toString();
+ }
+ }
+
private long count = 0;
ColumnStatisticsImpl(OrcProto.ColumnStatistics stats) {
@@ -578,6 +670,10 @@ class ColumnStatisticsImpl implements Co
throw new UnsupportedOperationException("Can't update decimal");
}
+ void updateDate(DateWritable value) {
+ throw new UnsupportedOperationException("Can't update date");
+ }
+
void merge(ColumnStatisticsImpl stats) {
count += stats.count;
}
@@ -621,6 +717,8 @@ class ColumnStatisticsImpl implements Co
return new StringStatisticsImpl();
case DECIMAL:
return new DecimalStatisticsImpl();
+ case DATE:
+ return new DateStatisticsImpl();
default:
return new ColumnStatisticsImpl();
}
@@ -640,6 +738,8 @@ class ColumnStatisticsImpl implements Co
return new StringStatisticsImpl(stats);
} else if (stats.hasDecimalStatistics()) {
return new DecimalStatisticsImpl(stats);
+ } else if (stats.hasDateStatistics()) {
+ return new DateStatisticsImpl(stats);
} else {
return new ColumnStatisticsImpl(stats);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java Tue Jul 30 22:22:35 2013
@@ -475,6 +475,8 @@ final class OrcStruct implements Writabl
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
case TIMESTAMP:
return PrimitiveObjectInspectorFactory.javaTimestampObjectInspector;
+ case DATE:
+ return PrimitiveObjectInspectorFactory.javaDateObjectInspector;
case DECIMAL:
return PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector;
default:
@@ -519,6 +521,8 @@ final class OrcStruct implements Writabl
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
case TIMESTAMP:
return PrimitiveObjectInspectorFactory.javaTimestampObjectInspector;
+ case DATE:
+ return PrimitiveObjectInspectorFactory.javaDateObjectInspector;
case DECIMAL:
return PrimitiveObjectInspectorFactory.javaHiveDecimalObjectInspector;
case STRUCT:
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Jul 30 22:22:35 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.io.BooleanWritable;
@@ -862,6 +864,50 @@ class RecordReaderImpl implements Record
}
}
+ private static class DateTreeReader extends TreeReader{
+ private RunLengthIntegerReader reader = null;
+
+ DateTreeReader(Path path, int columnId) {
+ super(path, columnId);
+ }
+
+ @Override
+ void startStripe(Map<StreamName, InStream> streams,
+ List<OrcProto.ColumnEncoding> encodings
+ ) throws IOException {
+ super.startStripe(streams, encodings);
+ StreamName name = new StreamName(columnId,
+ OrcProto.Stream.Kind.DATA);
+ reader = new RunLengthIntegerReader(streams.get(name), true);
+ }
+
+ @Override
+ void seek(PositionProvider[] index) throws IOException {
+ super.seek(index);
+ reader.seek(index[columnId]);
+ }
+
+ @Override
+ Object next(Object previous) throws IOException {
+ super.next(previous);
+ Date result = null;
+ if (valuePresent) {
+ if (previous == null) {
+ result = new Date(0);
+ } else {
+ result = (Date) previous;
+ }
+ result.setTime(DateWritable.daysToMillis((int) reader.next()));
+ }
+ return result;
+ }
+
+ @Override
+ void skipRows(long items) throws IOException {
+ reader.skip(countNonNulls(items));
+ }
+ }
+
private static class DecimalTreeReader extends TreeReader{
private InStream valueStream;
private RunLengthIntegerReader scaleStream;
@@ -1453,6 +1499,8 @@ class RecordReaderImpl implements Record
return new BinaryTreeReader(path, columnId);
case TIMESTAMP:
return new TimestampTreeReader(path, columnId);
+ case DATE:
+ return new DateTreeReader(path, columnId);
case DECIMAL:
return new DecimalTreeReader(path, columnId);
case STRUCT:
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Jul 30 22:22:35 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -41,12 +42,13 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
@@ -988,6 +990,46 @@ class WriterImpl implements Writer, Memo
}
}
+ private static class DateTreeWriter extends TreeWriter {
+ private final RunLengthIntegerWriter writer;
+
+ DateTreeWriter(int columnId,
+ ObjectInspector inspector,
+ StreamFactory writer,
+ boolean nullable) throws IOException {
+ super(columnId, inspector, writer, nullable);
+ PositionedOutputStream out = writer.createStream(id,
+ OrcProto.Stream.Kind.DATA);
+ this.writer = new RunLengthIntegerWriter(out, true);
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void write(Object obj) throws IOException {
+ super.write(obj);
+ if (obj != null) {
+ // Using the Writable here as it's used directly for writing as well as for stats.
+ DateWritable val = ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj);
+ indexStatistics.updateDate(val);
+ writer.write(val.getDays());
+ }
+ }
+
+ @Override
+ void writeStripe(OrcProto.StripeFooter.Builder builder,
+ int requiredIndexEntries) throws IOException {
+ super.writeStripe(builder, requiredIndexEntries);
+ writer.flush();
+ recordPosition(rowIndexPosition);
+ }
+
+ @Override
+ void recordPosition(PositionRecorder recorder) throws IOException {
+ super.recordPosition(recorder);
+ writer.getPosition(recorder);
+ }
+ }
+
private static class DecimalTreeWriter extends TreeWriter {
private final PositionedOutputStream valueStream;
private final RunLengthIntegerWriter scaleStream;
@@ -1262,6 +1304,9 @@ class WriterImpl implements Writer, Memo
case TIMESTAMP:
return new TimestampTreeWriter(streamFactory.getNextColumnId(),
inspector, streamFactory, nullable);
+ case DATE:
+ return new DateTreeWriter(streamFactory.getNextColumnId(),
+ inspector, streamFactory, nullable);
case DECIMAL:
return new DecimalTreeWriter(streamFactory.getNextColumnId(),
inspector, streamFactory, nullable);
@@ -1324,6 +1369,9 @@ class WriterImpl implements Writer, Memo
case TIMESTAMP:
type.setKind(OrcProto.Type.Kind.TIMESTAMP);
break;
+ case DATE:
+ type.setKind(OrcProto.Type.Kind.DATE);
+ break;
case DECIMAL:
type.setKind(OrcProto.Type.Kind.DECIMAL);
break;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Tue Jul 30 22:22:35 2013
@@ -192,7 +192,7 @@ public class BlockMergeTask extends Task
try {
addInputPaths(job, work);
- Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+ Utilities.setMapWork(job, work, ctx.getMRTmpFileURI());
// remove the pwd from conf file so that job tracker doesn't show this
// logs
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/MergeWork.java Tue Jul 30 22:22:35 2013
@@ -33,14 +33,14 @@ import org.apache.hadoop.hive.ql.io.Comb
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.Mapper;
@Explain(displayName = "Block level merge")
-public class MergeWork extends MapredWork implements Serializable {
+public class MergeWork extends MapWork implements Serializable {
private static final long serialVersionUID = 1L;
@@ -70,9 +70,6 @@ public class MergeWork extends MapredWor
if(this.getPathToPartitionInfo() == null) {
this.setPathToPartitionInfo(new LinkedHashMap<String, PartitionDesc>());
}
- if(this.getNumReduceTasks() == null) {
- this.setNumReduceTasks(0);
- }
for(String path: this.inputPaths) {
this.getPathToPartitionInfo().put(path, partDesc);
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java?rev=1508669&r1=1508668&r2=1508669&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java Tue Jul 30 22:22:35 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -181,7 +182,9 @@ public class PartialScanTask extends Tas
try {
addInputPaths(job, work);
- Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
+ MapredWork mrWork = new MapredWork();
+ mrWork.setMapWork(work);
+ Utilities.setMapRedWork(job, mrWork, ctx.getMRTmpFileURI());
// remove the pwd from conf file so that job tracker doesn't show this
// logs