You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/03 20:33:14 UTC
svn commit: r1519788 [1/2] - in /hive/branches/vectorization/ql/src:
java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mr/
java/org/apache/hadoop/hive/ql/exec/vector/
java/org/apache/hadoop/hive/ql/exec/vector/expressions/ java/o...
Author: hashutosh
Date: Tue Sep 3 18:33:13 2013
New Revision: 1519788
URL: http://svn.apache.org/r1519788
Log:
HIVE-4959 : Vectorized plan generation should be added as an optimization transform. (Jitendra Nath Pandey via Ashutosh Chauhan)
Added:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
Removed:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExecMapper.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
Modified:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterExprOrExpr.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorFilterOperator.java
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Sep 3 18:33:13 2013
@@ -87,7 +87,7 @@ public class FileSinkOperator extends Te
protected transient int maxPartitions;
protected transient ListBucketingCtx lbCtx;
protected transient boolean isSkewedStoredAsSubDirectories;
- private transient boolean statsCollectRawDataSize;
+ protected transient boolean statsCollectRawDataSize;
private static final transient String[] FATAL_ERR_MSG = {
@@ -220,6 +220,10 @@ public class FileSinkOperator extends Te
}
}
}
+
+ public Stat getStat() {
+ return stat;
+ }
} // class FSPaths
private static final long serialVersionUID = 1L;
@@ -227,7 +231,7 @@ public class FileSinkOperator extends Te
protected transient Serializer serializer;
protected transient BytesWritable commonKey = new BytesWritable();
protected transient TableIdEnum tabIdEnum = null;
- private transient LongWritable row_count;
+ protected transient LongWritable row_count;
private transient boolean isNativeTable = true;
/**
@@ -236,17 +240,17 @@ public class FileSinkOperator extends Te
* each reducer can write 10 files - this way we effectively get 1000 files.
*/
private transient ExprNodeEvaluator[] partitionEval;
- private transient int totalFiles;
+ protected transient int totalFiles;
private transient int numFiles;
- private transient boolean multiFileSpray;
- private transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+ protected transient boolean multiFileSpray;
+ protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
private transient ObjectInspector[] partitionObjectInspectors;
- private transient HivePartitioner<HiveKey, Object> prtner;
- private transient final HiveKey key = new HiveKey();
+ protected transient HivePartitioner<HiveKey, Object> prtner;
+ protected transient final HiveKey key = new HiveKey();
private transient Configuration hconf;
- private transient FSPaths fsp;
- private transient boolean bDynParts;
+ protected transient FSPaths fsp;
+ protected transient boolean bDynParts;
private transient SubStructObjectInspector subSetOI;
private transient int timeOut; // JT timeout in msec.
private transient long lastProgressReport = System.currentTimeMillis();
@@ -278,7 +282,7 @@ public class FileSinkOperator extends Te
Class<? extends Writable> outputClass;
String taskId;
- private boolean filesCreated = false;
+ protected boolean filesCreated = false;
private void initializeSpecPath() {
// For a query of the type:
@@ -431,7 +435,7 @@ public class FileSinkOperator extends Te
}
}
- private void createBucketFiles(FSPaths fsp) throws HiveException {
+ protected void createBucketFiles(FSPaths fsp) throws HiveException {
try {
int filesIdx = 0;
Set<Integer> seenBuckets = new HashSet<Integer>();
@@ -543,7 +547,7 @@ public class FileSinkOperator extends Te
*
* @return true if a new progress update is reported, false otherwise.
*/
- private boolean updateProgress() {
+ protected boolean updateProgress() {
if (reporter != null &&
(System.currentTimeMillis() - lastProgressReport) > timeOut) {
reporter.progress();
@@ -554,7 +558,7 @@ public class FileSinkOperator extends Te
}
}
- Writable recordValue;
+ protected Writable recordValue;
@Override
public void processOp(Object row, int tag) throws HiveException {
@@ -660,7 +664,7 @@ public class FileSinkOperator extends Te
* @return
* @throws HiveException
*/
- private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
+ protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
FSPaths fsp2 = valToPaths.get(lbDirName);
if (fsp2 == null) {
fsp2 = createNewPaths(lbDirName);
@@ -698,7 +702,7 @@ public class FileSinkOperator extends Te
* @param row row to process.
* @return directory name.
*/
- private String generateListBucketingDirName(Object row) {
+ protected String generateListBucketingDirName(Object row) {
if (!this.isSkewedStoredAsSubDirectories) {
return null;
}
@@ -739,7 +743,7 @@ public class FileSinkOperator extends Te
return lbDirName;
}
- private FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
+ protected FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
FSPaths fp;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Sep 3 18:33:13 2013
@@ -22,12 +22,17 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
-import org.apache.hadoop.hive.ql.plan.MuxDesc;
import org.apache.hadoop.hive.ql.plan.DemuxDesc;
import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
-import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.ForwardDesc;
@@ -39,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.La
import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MuxDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -106,6 +112,38 @@ public final class OperatorFactory {
MuxOperator.class));
}
+ public static ArrayList<OpTuple> vectorOpvec;
+ static {
+ vectorOpvec = new ArrayList<OpTuple>();
+ vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
+ vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
+ vectorOpvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class,
+ VectorReduceSinkOperator.class));
+ vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class));
+ vectorOpvec.add(new OpTuple<FilterDesc>(FilterDesc.class, VectorFilterOperator.class));
+ }
+
+ public static <T extends OperatorDesc> Operator<T> getVectorOperator(T conf,
+ VectorizationContext vContext) {
+ Class<T> descClass = (Class<T>) conf.getClass();
+ for (OpTuple o : vectorOpvec) {
+ if (o.descClass == descClass) {
+ try {
+ Operator<T> op = (Operator<T>) o.opClass.getDeclaredConstructor(
+ VectorizationContext.class, OperatorDesc.class).newInstance(
+ vContext, conf);
+ op.initializeCounters();
+ return op;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ throw new RuntimeException("No vector operator for descriptor class "
+ + descClass.getName());
+ }
+
public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
for (OpTuple o : opvec) {
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Sep 3 18:33:13 2013
@@ -25,7 +25,6 @@ import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
@@ -57,10 +56,6 @@ import org.apache.hadoop.hive.ql.exec.Pa
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.vector.VectorExecMapper;
-import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
@@ -243,22 +238,7 @@ public class ExecDriver extends Task<Map
//See the javadoc on HiveOutputFormatImpl and HadoopShims.prepareJobOutput()
job.setOutputFormat(HiveOutputFormatImpl.class);
-
- boolean vectorPath = HiveConf.getBoolVar(job,
- HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED);
-
- if (vectorPath) {
- if (validateVectorPath()) {
- LOG.info("Going down the vectorization path");
- job.setMapperClass(VectorExecMapper.class);
- } else {
- //fall back to non-vector mode
- HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
- job.setMapperClass(ExecMapper.class);
- }
- } else {
- job.setMapperClass(ExecMapper.class);
- }
+ job.setMapperClass(ExecMapper.class);
job.setMapOutputKeyClass(HiveKey.class);
job.setMapOutputValueClass(BytesWritable.class);
@@ -510,59 +490,6 @@ public class ExecDriver extends Task<Map
return (returnVal);
}
- private boolean validateVectorPath() {
- LOG.debug("Validating if vectorized execution is applicable");
- MapWork thePlan = this.getWork().getMapWork();
-
- for (String path : thePlan.getPathToPartitionInfo().keySet()) {
- PartitionDesc pd = thePlan.getPathToPartitionInfo().get(path);
- List<Class<?>> interfaceList =
- Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
- if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
- LOG.debug("Input format: " + pd.getInputFileFormatClassName()
- + ", doesn't provide vectorized input");
- return false;
- }
- }
- VectorizationContext vc = new VectorizationContext(null, 0);
- for (String onefile : thePlan.getPathToAliases().keySet()) {
- List<String> aliases = thePlan.getPathToAliases().get(onefile);
- for (String onealias : aliases) {
- Operator<? extends OperatorDesc> op = thePlan.getAliasToWork().get(
- onealias);
- Operator<? extends OperatorDesc> vectorOp = null;
- try {
- vectorOp = VectorMapOperator.vectorizeOperator(op, vc);
- } catch (Exception e) {
- LOG.debug("Cannot vectorize the plan", e);
- return false;
- }
- if (vectorOp == null) {
- LOG.debug("Cannot vectorize the plan");
- return false;
- }
- //verify the expressions contained in the operators
- try {
- validateVectorOperator(vectorOp);
- } catch (HiveException e) {
- LOG.debug("Cannot vectorize the plan", e);
- return false;
- }
- }
- }
- return true;
- }
-
- private void validateVectorOperator(Operator<? extends OperatorDesc> vectorOp)
- throws HiveException {
- vectorOp.initialize(job, null);
- if (vectorOp.getChildOperators() != null) {
- for (Operator<? extends OperatorDesc> vop : vectorOp.getChildOperators()) {
- validateVectorOperator(vop);
- }
- }
- }
-
private void handleSampling(DriverContext context, MapWork mWork, JobConf job, HiveConf conf)
throws Exception {
assert mWork.getAliasToWork().keySet().size() == 1;
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Sep 3 18:33:13 2013
@@ -19,553 +19,40 @@
package org.apache.hadoop.hive.ql.exec.vector;
import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
-import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
-import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
-import org.apache.hadoop.hive.ql.exec.Stat;
-import org.apache.hadoop.hive.ql.exec.TerminalOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
-import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.hive.ql.io.HivePartitioner;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair;
-import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
/**
* File Sink operator implementation.
**/
-public class VectorFileSinkOperator extends TerminalOperator<FileSinkDesc> implements
- Serializable {
+public class VectorFileSinkOperator extends FileSinkOperator {
- protected transient HashMap<String, FSPaths> valToPaths;
- protected transient int numDynParts;
- protected transient List<String> dpColNames;
- protected transient DynamicPartitionCtx dpCtx;
- protected transient boolean isCompressed;
- protected transient Path parent;
- protected transient HiveOutputFormat<?, ?> hiveOutputFormat;
- protected transient Path specPath;
- protected transient String childSpecPathDynLinkedPartitions;
- protected transient int dpStartCol; // start column # for DP columns
- protected transient List<String> dpVals; // array of values corresponding to DP columns
- protected transient List<Object> dpWritables;
- protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters
- protected transient int maxPartitions;
- protected transient ListBucketingCtx lbCtx;
- protected transient boolean isSkewedStoredAsSubDirectories;
- private transient boolean statsCollectRawDataSize;
-
- private static final transient String[] FATAL_ERR_MSG = {
- null, // counter value 0 means no error
- "Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions" +
- ".pernode."
- };
- private final VectorizationContext vContext;
+ private static final long serialVersionUID = 1L;
public VectorFileSinkOperator(VectorizationContext context,
OperatorDesc conf) {
- this.vContext = context;
+ super();
this.conf = (FileSinkDesc) conf;
}
- public class FSPaths implements Cloneable {
- Path tmpPath;
- Path taskOutputTempPath;
- Path[] outPaths;
- Path[] finalPaths;
- RecordWriter[] outWriters;
- Stat stat;
-
- public FSPaths() {
- }
-
- public FSPaths(Path specPath) {
- tmpPath = Utilities.toTempPath(specPath);
- taskOutputTempPath = Utilities.toTaskTempPath(specPath);
- outPaths = new Path[numFiles];
- finalPaths = new Path[numFiles];
- outWriters = new RecordWriter[numFiles];
- stat = new Stat();
- }
-
- /**
- * Update OutPath according to tmpPath.
- */
- public Path getTaskOutPath(String taskId) {
- return getOutPath(taskId, this.taskOutputTempPath);
- }
-
-
- /**
- * Update OutPath according to tmpPath.
- */
- public Path getOutPath(String taskId) {
- return getOutPath(taskId, this.tmpPath);
- }
-
- /**
- * Update OutPath according to tmpPath.
- */
- public Path getOutPath(String taskId, Path tmp) {
- return new Path(tmp, Utilities.toTempPath(taskId));
- }
-
- /**
- * Update the final paths according to tmpPath.
- */
- public Path getFinalPath(String taskId) {
- return getFinalPath(taskId, this.tmpPath, null);
- }
-
- /**
- * Update the final paths according to tmpPath.
- */
- public Path getFinalPath(String taskId, Path tmpPath, String extension) {
- if (extension != null) {
- return new Path(tmpPath, taskId + extension);
- } else {
- return new Path(tmpPath, taskId);
- }
- }
-
- public void setOutWriters(RecordWriter[] out) {
- outWriters = out;
- }
-
- public RecordWriter[] getOutWriters() {
- return outWriters;
- }
-
- public void closeWriters(boolean abort) throws HiveException {
- for (int idx = 0; idx < outWriters.length; idx++) {
- if (outWriters[idx] != null) {
- try {
- outWriters[idx].close(abort);
- updateProgress();
- } catch (IOException e) {
- throw new HiveException(e);
- }
- }
- }
- }
-
- private void commit(FileSystem fs) throws HiveException {
- for (int idx = 0; idx < outPaths.length; ++idx) {
- try {
- if ((bDynParts || isSkewedStoredAsSubDirectories)
- && !fs.exists(finalPaths[idx].getParent())) {
- fs.mkdirs(finalPaths[idx].getParent());
- }
- if (!fs.rename(outPaths[idx], finalPaths[idx])) {
- throw new HiveException("Unable to rename output from: " +
- outPaths[idx] + " to: " + finalPaths[idx]);
- }
- updateProgress();
- } catch (IOException e) {
- throw new HiveException("Unable to rename output from: " +
- outPaths[idx] + " to: " + finalPaths[idx], e);
- }
- }
- }
-
- public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
- for (int idx = 0; idx < outWriters.length; idx++) {
- if (outWriters[idx] != null) {
- try {
- outWriters[idx].close(abort);
- if (delete) {
- fs.delete(outPaths[idx], true);
- }
- updateProgress();
- } catch (IOException e) {
- throw new HiveException(e);
- }
- }
- }
- }
- } // class FSPaths
-
- private static final long serialVersionUID = 1L;
- protected transient FileSystem fs;
- protected transient Serializer serializer;
- protected transient BytesWritable commonKey = new BytesWritable();
- protected transient TableIdEnum tabIdEnum = null;
- private transient LongWritable row_count;
- private transient boolean isNativeTable = true;
-
- /**
- * The evaluators for the multiFile sprayer. If the table under consideration has 1000 buckets,
- * it is not a good idea to start so many reducers - if the maximum number of reducers is 100,
- * each reducer can write 10 files - this way we effectively get 1000 files.
- */
- private transient ExprNodeEvaluator[] partitionEval;
- private transient int totalFiles;
- private transient int numFiles;
- private transient boolean multiFileSpray;
- private transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
-
- private transient ObjectInspector[] partitionObjectInspectors;
- private transient HivePartitioner<HiveKey, Object> prtner;
- private transient final HiveKey key = new HiveKey();
- private transient Configuration hconf;
- private transient FSPaths fsp;
- private transient boolean bDynParts;
- private transient SubStructObjectInspector subSetOI;
- private transient int timeOut; // JT timeout in msec.
- private transient long lastProgressReport = System.currentTimeMillis();
-
- /**
- * TableIdEnum.
- *
- */
- public static enum TableIdEnum {
- TABLE_ID_1_ROWCOUNT,
- TABLE_ID_2_ROWCOUNT,
- TABLE_ID_3_ROWCOUNT,
- TABLE_ID_4_ROWCOUNT,
- TABLE_ID_5_ROWCOUNT,
- TABLE_ID_6_ROWCOUNT,
- TABLE_ID_7_ROWCOUNT,
- TABLE_ID_8_ROWCOUNT,
- TABLE_ID_9_ROWCOUNT,
- TABLE_ID_10_ROWCOUNT,
- TABLE_ID_11_ROWCOUNT,
- TABLE_ID_12_ROWCOUNT,
- TABLE_ID_13_ROWCOUNT,
- TABLE_ID_14_ROWCOUNT,
- TABLE_ID_15_ROWCOUNT;
- }
-
- protected transient boolean autoDelete = false;
- protected transient JobConf jc;
- Class<? extends Writable> outputClass;
- String taskId;
-
- private boolean filesCreated = false;
-
- private void initializeSpecPath() {
- // For a query of the type:
- // insert overwrite table T1
- // select * from (subq1 union all subq2)u;
- // subQ1 and subQ2 write to directories Parent/Child_1 and
- // Parent/Child_2 respectively, and union is removed.
- // The movetask that follows subQ1 and subQ2 tasks moves the directory
- // 'Parent'
-
- // However, if the above query contains dynamic partitions, subQ1 and
- // subQ2 have to write to directories: Parent/DynamicPartition/Child_1
- // and Parent/DynamicPartition/Child_1 respectively.
- // The movetask that follows subQ1 and subQ2 tasks still moves the directory
- // 'Parent'
- if ((!conf.isLinkedFileSink()) || (dpCtx == null)) {
- specPath = new Path(conf.getDirName());
- childSpecPathDynLinkedPartitions = null;
- return;
- }
-
- specPath = new Path(conf.getParentDir());
- childSpecPathDynLinkedPartitions = Utilities.getFileNameFromDirName(conf.getDirName());
- }
-
- @Override
- protected void initializeOp(Configuration hconf) throws HiveException {
- try {
- this.hconf = hconf;
- filesCreated = false;
- isNativeTable = !conf.getTableInfo().isNonNative();
- multiFileSpray = conf.isMultiFileSpray();
- totalFiles = conf.getTotalFiles();
- numFiles = conf.getNumFiles();
- dpCtx = conf.getDynPartCtx();
- lbCtx = conf.getLbCtx();
- valToPaths = new HashMap<String, FSPaths>();
- taskId = Utilities.getTaskId(hconf);
- initializeSpecPath();
- fs = specPath.getFileSystem(hconf);
- hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
- isCompressed = conf.getCompressed();
- parent = Utilities.toTempPath(conf.getDirName());
- statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
-
- serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
- serializer.initialize(null, conf.getTableInfo().getProperties());
- outputClass = serializer.getSerializedClass();
-
- // Timeout is chosen to make sure that even if one iteration takes more than
- // half of the script.timeout but less than script.timeout, we will still
- // be able to report progress.
- timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000) / 2;
-
- if (hconf instanceof JobConf) {
- jc = (JobConf) hconf;
- } else {
- // test code path
- jc = new JobConf(hconf, ExecDriver.class);
- }
-
- if (multiFileSpray) {
- partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
- int i = 0;
- for (ExprNodeDesc e : conf.getPartitionCols()) {
- partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
- }
-
- partitionObjectInspectors = initEvaluators(partitionEval, outputObjInspector);
- prtner = (HivePartitioner<HiveKey, Object>) ReflectionUtils.newInstance(
- jc.getPartitionerClass(), null);
- }
- int id = conf.getDestTableId();
- if ((id != 0) && (id <= TableIdEnum.values().length)) {
- String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT";
- tabIdEnum = TableIdEnum.valueOf(enumName);
- row_count = new LongWritable();
- statsMap.put(tabIdEnum, row_count);
- }
-
- if (dpCtx != null) {
- dpSetup();
- }
-
- if (lbCtx != null) {
- lbSetup();
- }
-
- if (!bDynParts) {
- fsp = new FSPaths(specPath);
-
- // Create all the files - this is required because empty files need to be created for
- // empty buckets
- // createBucketFiles(fsp);
- if (!this.isSkewedStoredAsSubDirectories) {
- valToPaths.put("", fsp); // special entry for non-DP case
- }
- }
-
- initializeChildren(hconf);
- } catch (HiveException e) {
- throw e;
- } catch (Exception e) {
- e.printStackTrace();
- throw new HiveException(e);
- }
- }
-
- /**
- * Initialize list bucketing information
- */
- private void lbSetup() {
- this.isSkewedStoredAsSubDirectories = ((lbCtx == null) ? false : lbCtx.isSkewedStoredAsDir());
- }
-
- /**
- * Set up for dynamic partitioning including a new ObjectInspector for the output row.
- */
- private void dpSetup() {
-
- this.bDynParts = false;
- this.numDynParts = dpCtx.getNumDPCols();
- this.dpColNames = dpCtx.getDPColNames();
- this.maxPartitions = dpCtx.getMaxPartitionsPerNode();
-
- assert numDynParts == dpColNames.size() : "number of dynamic paritions should be the same as the size of DP mapping";
-
- if (dpColNames != null && dpColNames.size() > 0) {
- this.bDynParts = true;
- assert inputObjInspectors.length == 1 : "FileSinkOperator should have 1 parent, but it has "
- + inputObjInspectors.length;
- StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0];
- // remove the last dpMapping.size() columns from the OI
- List<? extends StructField> fieldOI = soi.getAllStructFieldRefs();
- ArrayList<ObjectInspector> newFieldsOI = new ArrayList<ObjectInspector>();
- ArrayList<String> newFieldsName = new ArrayList<String>();
- this.dpStartCol = 0;
- for (StructField sf : fieldOI) {
- String fn = sf.getFieldName();
- if (!dpCtx.getInputToDPCols().containsKey(fn)) {
- newFieldsOI.add(sf.getFieldObjectInspector());
- newFieldsName.add(sf.getFieldName());
- this.dpStartCol++;
- }
- }
- assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty";
-
- this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol);
- this.dpVals = new ArrayList<String>(numDynParts);
- this.dpWritables = new ArrayList<Object>(numDynParts);
- }
- }
-
- private void createBucketFiles(FSPaths fsp) throws HiveException {
- try {
- int filesIdx = 0;
- Set<Integer> seenBuckets = new HashSet<Integer>();
- for (int idx = 0; idx < totalFiles; idx++) {
- if (this.getExecContext() != null && this.getExecContext().getFileId() != null) {
- LOG.info("replace taskId from execContext ");
-
- taskId = Utilities.replaceTaskIdFromFilename(taskId, this.getExecContext().getFileId());
-
- LOG.info("new taskId: FS " + taskId);
-
- assert !multiFileSpray;
- assert totalFiles == 1;
- }
-
- if (multiFileSpray) {
- key.setHashCode(idx);
-
- // Does this hashcode belong to this reducer
- int numReducers = totalFiles / numFiles;
-
- if (numReducers > 1) {
- int currReducer = Integer.valueOf(Utilities.getTaskIdFromFilename(Utilities
- .getTaskId(hconf)));
-
- int reducerIdx = prtner.getPartition(key, null, numReducers);
- if (currReducer != reducerIdx) {
- continue;
- }
- }
-
- int bucketNum = prtner.getBucket(key, null, totalFiles);
- if (seenBuckets.contains(bucketNum)) {
- continue;
- }
- seenBuckets.add(bucketNum);
-
- bucketMap.put(bucketNum, filesIdx);
- taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
- }
- if (isNativeTable) {
- fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId);
- LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
- fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId);
- LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
- } else {
- fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath;
- }
- try {
- // The reason to keep these instead of using
- // OutputFormat.getRecordWriter() is that
- // getRecordWriter does not give us enough control over the file name that
- // we create.
- String extension = Utilities.getFileExtension(jc, isCompressed,
- hiveOutputFormat);
- if (!bDynParts && !this.isSkewedStoredAsSubDirectories) {
- fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension);
- } else {
- fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- throw new HiveException(e);
- }
- LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
-
- if (isNativeTable) {
- try {
- // in recent hadoop versions, use deleteOnExit to clean tmp files.
- autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(
- fs, fsp.outPaths[filesIdx]);
- } catch (IOException e) {
- throw new HiveException(e);
- }
- }
-
- Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
- // only create bucket files only if no dynamic partitions,
- // buckets of dynamic partitions will be created for each newly created partition
- fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
- jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx],
- reporter);
- // increment the CREATED_FILES counter
- if (reporter != null) {
- reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
- }
- filesIdx++;
- }
- assert filesIdx == numFiles;
-
- // in recent hadoop versions, use deleteOnExit to clean tmp files.
- if (isNativeTable) {
- autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, fsp.outPaths[0]);
- }
- } catch (HiveException e) {
- throw e;
- } catch (Exception e) {
- e.printStackTrace();
- throw new HiveException(e);
- }
-
- filesCreated = true;
- }
+ public VectorFileSinkOperator() {
- /**
- * Report status to JT so that JT won't kill this task if closing takes too long
- * due to too many files to close and the NN is overloaded.
- *
- * @return true if a new progress update is reported, false otherwise.
- */
- private boolean updateProgress() {
- if (reporter != null &&
- (System.currentTimeMillis() - lastProgressReport) > timeOut) {
- reporter.progress();
- lastProgressReport = System.currentTimeMillis();
- return true;
- } else {
- return false;
- }
}
- Writable recordValue;
-
@Override
public void processOp(Object data, int tag) throws HiveException {
@@ -652,15 +139,15 @@ public class VectorFileSinkOperator exte
}
}
- rowOutWriters = fpaths.outWriters;
+ rowOutWriters = fpaths.getOutWriters();
if (conf.isGatherStats()) {
if (statsCollectRawDataSize) {
SerDeStats stats = serializer.getSerDeStats();
if (stats != null) {
- fpaths.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+ fpaths.getStat().addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
}
}
- fpaths.stat.addToStat(StatsSetupConst.ROW_COUNT, 1);
+ fpaths.getStat().addToStat(StatsSetupConst.ROW_COUNT, 1);
}
@@ -682,417 +169,4 @@ public class VectorFileSinkOperator exte
}
}
}
-
- /**
- * Generate list bucketing directory name from a row.
- * @param row row to process.
- * @return directory name.
- */
- private String generateListBucketingDirName(Object row) {
- if (!this.isSkewedStoredAsSubDirectories) {
- return null;
- }
-
- String lbDirName = null;
- List<Object> standObjs = new ArrayList<Object>();
- List<String> skewedCols = lbCtx.getSkewedColNames();
- List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
- List<String> skewedValsCandidate = null;
- Map<List<String>, String> locationMap = lbCtx.getLbLocationMap();
-
- /* Convert input row to standard objects. */
- ObjectInspectorUtils.copyToStandardObject(standObjs, row,
- (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
-
- assert (standObjs.size() >= skewedCols.size()) :
- "The row has less number of columns than no. of skewed column.";
-
- skewedValsCandidate = new ArrayList<String>(skewedCols.size());
- for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) {
- skewedValsCandidate.add(posPair.getSkewColPosition(),
- standObjs.get(posPair.getTblColPosition()).toString());
- }
- /* The row matches skewed column names. */
- if (allSkewedVals.contains(skewedValsCandidate)) {
- /* matches skewed values. */
- lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate);
- locationMap.put(skewedValsCandidate, lbDirName);
- } else {
- /* create default directory. */
- lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
- lbCtx.getDefaultDirName());
- List<String> defaultKey = Arrays.asList(lbCtx.getDefaultKey());
- if (!locationMap.containsKey(defaultKey)) {
- locationMap.put(defaultKey, lbDirName);
- }
- }
- return lbDirName;
- }
-
- /**
- * Lookup list bucketing path.
- * @param lbDirName
- * @return
- * @throws HiveException
- */
- private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
- FSPaths fsp2 = valToPaths.get(lbDirName);
- if (fsp2 == null) {
- fsp2 = createNewPaths(lbDirName);
- }
- return fsp2;
- }
-
- /**
- * create new path.
- *
- * @param dirName
- * @return
- * @throws HiveException
- */
- private FSPaths createNewPaths(String dirName) throws HiveException {
- FSPaths fsp2 = new FSPaths(specPath);
- if (childSpecPathDynLinkedPartitions != null) {
- fsp2.tmpPath = new Path(fsp2.tmpPath,
- dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
- fsp2.taskOutputTempPath =
- new Path(fsp2.taskOutputTempPath,
- dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions);
- } else {
- fsp2.tmpPath = new Path(fsp2.tmpPath, dirName);
- fsp2.taskOutputTempPath =
- new Path(fsp2.taskOutputTempPath, dirName);
- }
- createBucketFiles(fsp2);
- valToPaths.put(dirName, fsp2);
- return fsp2;
- }
-
- private FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
-
- FSPaths fp;
-
- // get the path corresponding to the dynamic partition columns,
- String dpDir = getDynPartDirectory(row, dpColNames, numDynParts);
-
- if (dpDir != null) {
- dpDir = appendListBucketingDirName(lbDirName, dpDir);
- FSPaths fsp2 = valToPaths.get(dpDir);
-
- if (fsp2 == null) {
- // check # of dp
- if (valToPaths.size() > maxPartitions) {
- // throw fatal error
- incrCounter(fatalErrorCntr, 1);
- fatalError = true;
- LOG.error("Fatal error was thrown due to exceeding number of dynamic partitions");
- }
- fsp2 = createNewPaths(dpDir);
- }
- fp = fsp2;
- } else {
- fp = fsp;
- }
- return fp;
- }
-
- /**
- * Append list bucketing dir name to original dir name.
- * Skewed columns cannot be partitioned columns.
- * @param lbDirName
- * @param dpDir
- * @return
- */
- private String appendListBucketingDirName(String lbDirName, String dpDir) {
- StringBuilder builder = new StringBuilder(dpDir);
- dpDir = (lbDirName == null) ? dpDir : builder.append(Path.SEPARATOR).append(lbDirName)
- .toString();
- return dpDir;
- }
-
- // given the current input row, the mapping for input col info to dp columns, and # of dp cols,
- // return the relative path corresponding to the row.
- // e.g., ds=2008-04-08/hr=11
- private String getDynPartDirectory(List<String> row, List<String> dpColNames, int numDynParts) {
- assert row.size() == numDynParts && numDynParts == dpColNames.size() : "data length is different from num of DP columns";
- return FileUtils.makePartName(dpColNames, row);
- }
-
- @Override
- protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) {
- errMsg.append("Operator ").append(getOperatorId()).append(" (id=").append(id).append("): ");
- errMsg.append(counterCode > FATAL_ERR_MSG.length - 1 ?
- "fatal error." :
- FATAL_ERR_MSG[(int) counterCode]);
- // number of partitions exceeds limit, list all the partition names
- if (counterCode > 0) {
- errMsg.append(lsDir());
- }
- }
-
- // sample the partitions that are generated so that users have a sense of what's causing the error
- private String lsDir() {
- String specPath = conf.getDirName();
- // need to get a JobConf here because it's not passed through at client side
- JobConf jobConf = new JobConf(ExecDriver.class);
- Path tmpPath = Utilities.toTempPath(specPath);
- StringBuilder sb = new StringBuilder("\n");
- try {
- DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
- int numDP = dpCtx.getNumDPCols();
- FileSystem fs = tmpPath.getFileSystem(jobConf);
- int level = numDP;
- if (conf.isLinkedFileSink()) {
- level++;
- }
- FileStatus[] status = Utilities.getFileStatusRecurse(tmpPath, level, fs);
- sb.append("Sample of ")
- .append(Math.min(status.length, 100))
- .append(" partitions created under ")
- .append(tmpPath.toString())
- .append(":\n");
- for (int i = 0; i < status.length; ++i) {
- sb.append("\t.../");
- sb.append(getPartitionSpec(status[i].getPath(), numDP))
- .append("\n");
- }
- sb.append("...\n");
- } catch (Exception e) {
- // cannot get the subdirectories, just return the root directory
- sb.append(tmpPath).append("...\n").append(e.getMessage());
- e.printStackTrace();
- } finally {
- return sb.toString();
- }
- }
-
- private String getPartitionSpec(Path path, int level) {
- Stack<String> st = new Stack<String>();
- Path p = path;
- for (int i = 0; i < level; ++i) {
- st.push(p.getName());
- p = p.getParent();
- }
- StringBuilder sb = new StringBuilder();
- while (!st.empty()) {
- sb.append(st.pop());
- }
- return sb.toString();
- }
-
- @Override
- public void closeOp(boolean abort) throws HiveException {
- if (!bDynParts && !filesCreated) {
- createBucketFiles(fsp);
- }
-
- lastProgressReport = System.currentTimeMillis();
- if (!abort) {
- for (FSPaths fsp : valToPaths.values()) {
- fsp.closeWriters(abort);
- if (isNativeTable) {
- fsp.commit(fs);
- }
- }
- // Only publish stats if this operator's flag was set to gather stats
- if (conf.isGatherStats()) {
- publishStats();
- }
- } else {
- // Will come here if an Exception was thrown in map() or reduce().
- // Hadoop always call close() even if an Exception was thrown in map() or
- // reduce().
- for (FSPaths fsp : valToPaths.values()) {
- fsp.abortWriters(fs, abort, !autoDelete && isNativeTable);
- }
- }
- }
-
- /**
- * @return the name of the operator
- */
- @Override
- public String getName() {
- return getOperatorName();
- }
-
- static public String getOperatorName() {
- return "FS";
- }
-
- @Override
- public void jobCloseOp(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
- throws HiveException {
- try {
- if ((conf != null) && isNativeTable) {
- String specPath = conf.getDirName();
- DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
- if (conf.isLinkedFileSink() && (dpCtx != null)) {
- specPath = conf.getParentDir();
- }
- Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf,
- reporter);
- }
- } catch (IOException e) {
- throw new HiveException(e);
- }
- super.jobCloseOp(hconf, success, feedBack);
- }
-
- @Override
- public OperatorType getType() {
- return OperatorType.FILESINK;
- }
-
- @Override
- public void augmentPlan() {
- PlanUtils.configureOutputJobPropertiesForStorageHandler(
- getConf().getTableInfo());
- }
-
- public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
- if (hiveOutputFormat == null) {
- try {
- hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
- } catch (Exception ex) {
- throw new IOException(ex);
- }
- }
- Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), job);
-
- if (conf.getTableInfo().isNonNative()) {
- //check the ouput specs only if it is a storage handler (native tables's outputformats does
- //not set the job's output properties correctly)
- try {
- hiveOutputFormat.checkOutputSpecs(ignored, job);
- } catch (NoSuchMethodError e) {
- //For BC, ignore this for now, but leave a log message
- LOG.warn("HiveOutputFormat should implement checkOutputSpecs() method`");
- }
- }
- }
-
- private void publishStats() throws HiveException {
- boolean isStatsReliable = conf.isStatsReliable();
-
- // Initializing a stats publisher
- StatsPublisher statsPublisher = Utilities.getStatsPublisher(jc);
-
- if (statsPublisher == null) {
- // just return, stats gathering should not block the main query
- LOG.error("StatsPublishing error: StatsPublisher is not initialized.");
- if (isStatsReliable) {
- throw new HiveException(ErrorMsg.STATSPUBLISHER_NOT_OBTAINED.getErrorCodedMsg());
- }
- return;
- }
-
- if (!statsPublisher.connect(hconf)) {
- // just return, stats gathering should not block the main query
- LOG.error("StatsPublishing error: cannot connect to database");
- if (isStatsReliable) {
- throw new HiveException(ErrorMsg.STATSPUBLISHER_CONNECTION_ERROR.getErrorCodedMsg());
- }
- return;
- }
-
- String taskID = Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf));
- String spSpec = conf.getStaticSpec() != null ? conf.getStaticSpec() : "";
-
- for (String fspKey : valToPaths.keySet()) {
- FSPaths fspValue = valToPaths.get(fspKey);
- String key;
-
- // construct the key(fileID) to insert into the intermediate stats table
- if (fspKey == "") {
- // for non-partitioned/static partitioned table, the key for temp storage is
- // common key prefix + static partition spec + taskID
- String keyPrefix = Utilities.getHashedStatsPrefix(
- conf.getStatsAggPrefix() + spSpec, conf.getMaxStatsKeyPrefixLength());
- key = keyPrefix + taskID;
- } else {
- // for partitioned table, the key is
- // common key prefix + static partition spec + DynamicPartSpec + taskID
- key = createKeyForStatsPublisher(taskID, spSpec, fspKey);
- }
- Map<String, String> statsToPublish = new HashMap<String, String>();
- for (String statType : fspValue.stat.getStoredStats()) {
- statsToPublish.put(statType, Long.toString(fspValue.stat.getStat(statType)));
- }
- if (!statsPublisher.publishStat(key, statsToPublish)) {
- // The original exception is lost.
- // Not changing the interface to maintain backward compatibility
- if (isStatsReliable) {
- throw new HiveException(ErrorMsg.STATSPUBLISHER_PUBLISHING_ERROR.getErrorCodedMsg());
- }
- }
- }
- if (!statsPublisher.closeConnection()) {
- // The original exception is lost.
- // Not changing the interface to maintain backward compatibility
- if (isStatsReliable) {
- throw new HiveException(ErrorMsg.STATSPUBLISHER_CLOSING_ERROR.getErrorCodedMsg());
- }
- }
- }
-
- /**
- * This is server side code to create key in order to save statistics to stats database.
- * Client side will read it via StatsTask.java aggregateStats().
- * Client side reads it via db query prefix which is based on partition spec.
- * Since store-as-subdir information is not part of partition spec, we have to
- * remove store-as-subdir information from variable "keyPrefix" calculation.
- * But we have to keep store-as-subdir information in variable "key" calculation
- * since each skewed value has a row in stats db and "key" is db key,
- * otherwise later value overwrites previous value.
- * Performance impact due to string handling is minimum since this method is
- * only called once in FileSinkOperator closeOp().
- * For example,
- * create table test skewed by (key, value) on (('484','val_484') stored as DIRECTORIES;
- * skewedValueDirList contains 2 elements:
- * 1. key=484/value=val_484
- * 2. HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
- * Case #1: Static partition with store-as-sub-dir
- * spSpec has SP path
- * fspKey has either
- * key=484/value=val_484 or
- * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
- * After filter, fspKey is empty, storedAsDirPostFix has either
- * key=484/value=val_484 or
- * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
- * so, at the end, "keyPrefix" doesnt have subdir information but "key" has
- * Case #2: Dynamic partition with store-as-sub-dir. Assume dp part is hr
- * spSpec has SP path
- * fspKey has either
- * hr=11/key=484/value=val_484 or
- * hr=11/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
- * After filter, fspKey is hr=11, storedAsDirPostFix has either
- * key=484/value=val_484 or
- * HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME/HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME
- * so, at the end, "keyPrefix" doesn't have subdir information from skewed but "key" has
- * @param taskID
- * @param spSpec
- * @param fspKey
- * @return
- */
- private String createKeyForStatsPublisher(String taskID, String spSpec, String fspKey) {
- String key;
- String newFspKey = fspKey;
- String storedAsDirPostFix = "";
- if (isSkewedStoredAsSubDirectories) {
- List<String> skewedValueDirList = this.lbCtx.getSkewedValuesDirNames();
- for (String dir : skewedValueDirList) {
- newFspKey = newFspKey.replace(dir, "");
- if (!newFspKey.equals(fspKey)) {
- storedAsDirPostFix = dir;
- break;
- }
- }
- }
- String keyPrefix = Utilities.getHashedStatsPrefix(
- conf.getStatsAggPrefix() + spSpec + newFspKey + Path.SEPARATOR,
- conf.getMaxStatsKeyPrefixLength());
- key = keyPrefix + storedAsDirPostFix + taskID;
- return key;
- }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java Tue Sep 3 18:33:13 2013
@@ -23,6 +23,7 @@ import java.io.Serializable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -48,31 +49,47 @@ public class VectorFilterOperator extend
}
private final transient LongWritable filtered_count, passed_count;
- private transient VectorExpression conditionEvaluator;
+ private VectorExpression conditionEvaluator = null;
transient int heartbeatInterval;
- private final VectorizationContext vContext;
- public VectorFilterOperator(VectorizationContext ctxt, OperatorDesc conf) {
+ // filterMode is 1 if condition is always true, -1 if always false
+ // and 0 if condition needs to be computed.
+ transient private int filterMode = 0;
+
+ public VectorFilterOperator(VectorizationContext vContext, OperatorDesc conf)
+ throws HiveException {
+ this();
+ vContext.setOperatorType(OperatorType.FILTER);
+ ExprNodeDesc oldExpression = ((FilterDesc) conf).getPredicate();
+ conditionEvaluator = vContext.getVectorExpression(oldExpression);
+ }
+
+ public VectorFilterOperator() {
super();
- this.vContext = ctxt;
filtered_count = new LongWritable();
passed_count = new LongWritable();
this.conf = (FilterDesc) conf;
}
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
try {
heartbeatInterval = HiveConf.getIntVar(hconf,
HiveConf.ConfVars.HIVESENDHEARTBEAT);
- ExprNodeDesc oldExpression = conf.getPredicate();
- vContext.setOperatorType(OperatorType.FILTER);
- conditionEvaluator = vContext.getVectorExpression(oldExpression);
statsMap.put(Counter.FILTERED, filtered_count);
statsMap.put(Counter.PASSED, passed_count);
} catch (Throwable e) {
throw new HiveException(e);
}
+ if (conditionEvaluator instanceof ConstantVectorExpression) {
+ ConstantVectorExpression cve = (ConstantVectorExpression) this.conditionEvaluator;
+ if (cve.getLongValue() == 1) {
+ filterMode = 1;
+ } else {
+ filterMode = -1;
+ }
+ }
initializeChildren(hconf);
}
@@ -86,7 +103,18 @@ public class VectorFilterOperator extend
VectorizedRowBatch vrg = (VectorizedRowBatch) row;
//Evaluate the predicate expression
//The selected vector represents selected rows.
- conditionEvaluator.evaluate(vrg);
+ switch (filterMode) {
+ case 0:
+ conditionEvaluator.evaluate(vrg);
+ break;
+ case -1:
+ // All will be filtered out
+ vrg.size = 0;
+ break;
+ case 1:
+ default:
+ // All are selected, do nothing
+ }
if (vrg.size > 0) {
forward(vrg, null);
}
@@ -108,4 +136,12 @@ public class VectorFilterOperator extend
public OperatorType getType() {
return OperatorType.FILTER;
}
+
+ public VectorExpression getConditionEvaluator() {
+ return conditionEvaluator;
+ }
+
+ public void setConditionEvaluator(VectorExpression conditionEvaluator) {
+ this.conditionEvaluator = conditionEvaluator;
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Tue Sep 3 18:33:13 2013
@@ -57,21 +57,19 @@ public class VectorGroupByOperator exten
private static final Log LOG = LogFactory.getLog(
VectorGroupByOperator.class.getName());
- private final VectorizationContext vContext;
-
/**
* This is the vector of aggregators. They are stateless and only implement
* the algorithm of how to compute the aggregation. state is kept in the
* aggregation buffers and is our responsibility to match the proper state for each key.
*/
- private transient VectorAggregateExpression[] aggregators;
+ private VectorAggregateExpression[] aggregators;
/**
* Key vector expressions.
*/
- private transient VectorExpression[] keyExpressions;
+ private VectorExpression[] keyExpressions;
- private VectorExpressionWriter[] keyOutputWriters;
+ private transient VectorExpressionWriter[] keyOutputWriters;
/**
* The aggregation buffers to use for the current batch.
@@ -141,10 +139,24 @@ public class VectorGroupByOperator exten
private static final long serialVersionUID = 1L;
- public VectorGroupByOperator(VectorizationContext ctxt, OperatorDesc conf) {
+ public VectorGroupByOperator(VectorizationContext vContext, OperatorDesc conf)
+ throws HiveException {
+ this();
+ GroupByDesc desc = (GroupByDesc) conf;
+ this.conf = desc;
+ vContext.setOperatorType(OperatorType.GROUPBY);
+ List<ExprNodeDesc> keysDesc = desc.getKeys();
+ keyExpressions = vContext.getVectorExpressions(keysDesc);
+ ArrayList<AggregationDesc> aggrDesc = desc.getAggregators();
+ aggregators = new VectorAggregateExpression[aggrDesc.size()];
+ for (int i = 0; i < aggrDesc.size(); ++i) {
+ AggregationDesc aggDesc = aggrDesc.get(i);
+ aggregators[i] = vContext.getAggregatorExpression(aggDesc);
+ }
+ }
+
+ public VectorGroupByOperator() {
super();
- this.vContext = ctxt;
- this.conf = (GroupByDesc) conf;
}
@Override
@@ -152,11 +164,8 @@ public class VectorGroupByOperator exten
List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
+ List<ExprNodeDesc> keysDesc = conf.getKeys();
try {
- vContext.setOperatorType(OperatorType.GROUPBY);
-
- List<ExprNodeDesc> keysDesc = conf.getKeys();
- keyExpressions = vContext.getVectorExpressions(keysDesc);
keyOutputWriters = new VectorExpressionWriter[keyExpressions.length];
@@ -166,11 +175,8 @@ public class VectorGroupByOperator exten
objectInspectors.add(keyOutputWriters[i].getObjectInspector());
}
- ArrayList<AggregationDesc> aggrDesc = conf.getAggregators();
- aggregators = new VectorAggregateExpression[aggrDesc.size()];
- for (int i = 0; i < aggrDesc.size(); ++i) {
- AggregationDesc desc = aggrDesc.get(i);
- aggregators[i] = vContext.getAggregatorExpression (desc);
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].init(conf.getAggregators().get(i));
objectInspectors.add(aggregators[i].getOutputObjectInspector());
}
@@ -215,13 +221,15 @@ public class VectorGroupByOperator exten
maxHashTblMemory = (int)(maxMemory * memoryThreshold);
- LOG.info(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)",
- maxHashTblMemory/1024/1024,
- maxMemory/1024/1024,
- memoryThreshold,
- fixedHashEntrySize,
- keyWrappersBatch.getKeysFixedSize(),
- aggregationBatchInfo.getAggregatorsFixedSize()));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("maxMemory:%dMb (%d * %f) fixSize:%d (key:%d agg:%d)",
+ maxHashTblMemory/1024/1024,
+ maxMemory/1024/1024,
+ memoryThreshold,
+ fixedHashEntrySize,
+ keyWrappersBatch.getKeysFixedSize(),
+ aggregationBatchInfo.getAggregatorsFixedSize()));
+ }
}
@@ -264,15 +272,16 @@ public class VectorGroupByOperator exten
(int)(numEntriesHashTable * PERCENT_ENTRIES_TO_FLUSH);
int entriesFlushed = 0;
- LOG.info(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)",
- entriesToFlush, all ? "(all)" : "",
- numEntriesHashTable, fixedHashEntrySize, avgVariableSize,
- numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024,
- maxHashTblMemory/1024/1024));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)",
+ entriesToFlush, all ? "(all)" : "",
+ numEntriesHashTable, fixedHashEntrySize, avgVariableSize,
+ numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024,
+ maxHashTblMemory/1024/1024));
+ }
Object[] forwardCache = new Object[keyExpressions.length + aggregators.length];
if (keyExpressions.length == 0 && mapKeysAggregationBuffers.isEmpty()) {
-
// if this is a global aggregation (no keys) and empty set, must still emit NULLs
VectorAggregationBufferRow emptyBuffers = allocateAggregationBuffer();
for (int i = 0; i < aggregators.length; ++i) {
@@ -280,7 +289,6 @@ public class VectorGroupByOperator exten
}
forward(forwardCache, outputObjInspector);
} else {
-
/* Iterate the global (keywrapper,aggregationbuffers) map and emit
a row for each key */
Iterator<Map.Entry<KeyWrapper, VectorAggregationBufferRow>> iter =
@@ -297,8 +305,10 @@ public class VectorGroupByOperator exten
forwardCache[fi++] = aggregators[i].evaluateOutput(pair.getValue()
.getAggregationBuffer(i));
}
- LOG.debug(String.format("forwarding keys: %s: %s",
- pair.getKey().toString(), Arrays.toString(forwardCache)));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("forwarding keys: %s: %s",
+ pair.getKey().toString(), Arrays.toString(forwardCache)));
+ }
forward(forwardCache, outputObjInspector);
if (!all) {
@@ -441,5 +451,21 @@ public class VectorGroupByOperator exten
return OperatorType.GROUPBY;
}
+ public VectorExpression[] getKeyExpressions() {
+ return keyExpressions;
+ }
+
+ public void setKeyExpressions(VectorExpression[] keyExpressions) {
+ this.keyExpressions = keyExpressions;
+ }
+
+ public VectorAggregateExpression[] getAggregators() {
+ return aggregators;
+ }
+
+ public void setAggregators(VectorAggregateExpression[] aggregators) {
+ this.aggregators = aggregators;
+ }
+
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Tue Sep 3 18:33:13 2013
@@ -52,26 +52,24 @@ public class VectorReduceSinkOperator ex
private static final long serialVersionUID = 1L;
- private final VectorizationContext vContext;
-
/**
* The evaluators for the key columns. Key columns decide the sort order on
* the reducer side. Key columns are passed to the reducer in the "key".
*/
- protected transient VectorExpression[] keyEval;
-
+ protected VectorExpression[] keyEval;
+
/**
* The key value writers. These know how to write the necessary writable type
* based on key column metadata, from the primitive vector type.
*/
protected transient VectorExpressionWriter[] keyWriters;
-
+
/**
* The evaluators for the value columns. Value columns are passed to reducer
* in the "value".
*/
- protected transient VectorExpression[] valueEval;
-
+ protected VectorExpression[] valueEval;
+
/**
* The output value writers. These know how to write the necessary writable type
* based on value column metadata, from the primitive vector type.
@@ -83,19 +81,19 @@ public class VectorReduceSinkOperator ex
* Hive language). Partition columns decide the reducer that the current row
* goes to. Partition columns are not passed to reducer.
*/
- protected transient VectorExpression[] partitionEval;
-
+ protected VectorExpression[] partitionEval;
+
/**
* The partition value writers. These know how to write the necessary writable type
* based on partition column metadata, from the primitive vector type.
- */
+ */
protected transient VectorExpressionWriter[] partitionWriters;
- private int numDistributionKeys;
+ private transient int numDistributionKeys;
- private List<List<Integer>> distinctColIndices;
+ private transient List<List<Integer>> distinctColIndices;
- private int numDistinctExprs;
+ private transient int numDistinctExprs;
transient HiveKey keyWritable = new HiveKey();
transient Writable value;
@@ -115,14 +113,24 @@ public class VectorReduceSinkOperator ex
transient ObjectInspector[] partitionObjectInspectors;
transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
+ public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
+ throws HiveException {
+ this();
+ ReduceSinkDesc desc = (ReduceSinkDesc) conf;
+ this.conf = desc;
+ vContext.setOperatorType(OperatorType.REDUCESINK);
+ keyEval = vContext.getVectorExpressions(desc.getKeyCols());
+ valueEval = vContext.getVectorExpressions(desc.getValueCols());
+ partitionEval = vContext.getVectorExpressions(desc.getPartitionCols());
+ }
+
+ public VectorReduceSinkOperator() {
+ super();
+ }
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
try {
- vContext.setOperatorType(OperatorType.REDUCESINK);
- keyEval = vContext.getVectorExpressions(conf.getKeyCols());
- valueEval = vContext.getVectorExpressions(conf.getValueCols());
- partitionEval = vContext.getVectorExpressions(conf.getPartitionCols());
numDistributionKeys = conf.getNumDistributionKeys();
distinctColIndices = conf.getDistinctColumnIndices();
@@ -133,12 +141,12 @@ public class VectorReduceSinkOperator ex
.newInstance();
keySerializer.initialize(null, keyTableDesc.getProperties());
keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
+
/*
- * Compute and assign the key writers and the key object inspector
+ * Compute and assign the key writers and the key object inspector
*/
VectorExpressionWriterFactory.processVectorExpressions(
- conf.getKeyCols(),
+ conf.getKeyCols(),
conf.getOutputKeyColumnNames(),
new VectorExpressionWriterFactory.Closure() {
@Override
@@ -148,7 +156,7 @@ public class VectorReduceSinkOperator ex
keyObjectInspector = objectInspector;
}
});
-
+
String colNames = "";
for(String colName : conf.getOutputKeyColumnNames()) {
colNames = String.format("%s %s", colNames, colName);
@@ -160,12 +168,12 @@ public class VectorReduceSinkOperator ex
colNames));
partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols());
-
+
TableDesc valueTableDesc = conf.getValueSerializeInfo();
valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
.newInstance();
valueSerializer.initialize(null, valueTableDesc.getProperties());
-
+
/*
* Compute and assign the value writers and the value object inspector
*/
@@ -323,13 +331,6 @@ public class VectorReduceSinkOperator ex
}
}
- public VectorReduceSinkOperator (
- VectorizationContext context,
- OperatorDesc conf) {
- this.vContext = context;
- this.conf = (ReduceSinkDesc) conf;
- }
-
/**
* @return the name of the operator
*/
@@ -352,4 +353,28 @@ public class VectorReduceSinkOperator ex
return false;
}
+ public VectorExpression[] getPartitionEval() {
+ return partitionEval;
+ }
+
+ public void setPartitionEval(VectorExpression[] partitionEval) {
+ this.partitionEval = partitionEval;
+ }
+
+ public VectorExpression[] getValueEval() {
+ return valueEval;
+ }
+
+ public void setValueEval(VectorExpression[] valueEval) {
+ this.valueEval = valueEval;
+ }
+
+ public VectorExpression[] getKeyEval() {
+ return keyEval;
+ }
+
+ public void setKeyEval(VectorExpression[] keyEval) {
+ this.keyEval = keyEval;
+ }
+
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java?rev=1519788&r1=1519787&r2=1519788&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java Tue Sep 3 18:33:13 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
@@ -43,17 +44,27 @@ public class VectorSelectOperator extend
private static final long serialVersionUID = 1L;
- protected transient VectorExpression[] vExpressions;
+ protected VectorExpression[] vExpressions = null;
- private final VectorizationContext vContext;
+ private transient int [] projectedColumns = null;
- private int [] projectedColumns = null;
+ private transient VectorExpressionWriter [] valueWriters = null;
- private VectorExpressionWriter [] valueWriters = null;
-
- public VectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) {
- this.vContext = ctxt;
+ public VectorSelectOperator(VectorizationContext vContext, OperatorDesc conf)
+ throws HiveException {
this.conf = (SelectDesc) conf;
+ List<ExprNodeDesc> colList = this.conf.getColList();
+ vContext.setOperatorType(OperatorType.SELECT);
+ vExpressions = new VectorExpression[colList.size()];
+ for (int i = 0; i < colList.size(); i++) {
+ vExpressions[i] = vContext.getVectorExpression(colList.get(i));
+ String columnName = this.conf.getOutputColumnNames().get(i);
+ // Update column map with output column names
+ vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn());
+ }
+ }
+
+ public VectorSelectOperator() {
}
@Override
@@ -67,14 +78,6 @@ public class VectorSelectOperator extend
List<ObjectInspector> objectInspectors = new ArrayList<ObjectInspector>();
List<ExprNodeDesc> colList = conf.getColList();
- vContext.setOperatorType(OperatorType.SELECT);
- vExpressions = new VectorExpression[colList.size()];
- for (int i = 0; i < colList.size(); i++) {
- vExpressions[i] = vContext.getVectorExpression(colList.get(i));
- String columnName = conf.getOutputColumnNames().get(i);
- // Update column map with output column names
- vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn());
- }
valueWriters = VectorExpressionWriterFactory.getExpressionWriters(colList);
for (VectorExpressionWriter vew : valueWriters) {
objectInspectors.add(vew.getObjectInspector());
@@ -141,4 +144,21 @@ public class VectorSelectOperator extend
public OperatorType getType() {
return OperatorType.SELECT;
}
+
+ @Explain (displayName = "vector expressions")
+ public VectorExpression[] getvExpressions() {
+ return vExpressions;
+ }
+
+ public VectorExpression[] getVExpressions() {
+ return vExpressions;
+ }
+
+ public void setvExpressions(VectorExpression[] vExpressions) {
+ this.vExpressions = vExpressions;
+ }
+
+ public void setVExpressions(VectorExpression[] vExpressions) {
+ this.vExpressions = vExpressions;
+ }
}