You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [13/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/jav...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Thu Jan 21 10:37:58 2010
@@ -43,10 +43,7 @@
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
-
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TMemoryBuffer;
@@ -57,16 +54,17 @@
public class Partition {
@SuppressWarnings("nls")
- static final private Log LOG = LogFactory.getLog("hive.ql.metadata.Partition");
+ static final private Log LOG = LogFactory
+ .getLog("hive.ql.metadata.Partition");
private Table table;
private org.apache.hadoop.hive.metastore.api.Partition tPartition;
-
+
private Deserializer deserializer;
private Properties schema;
private Class<? extends InputFormat> inputFormatClass;
private Class<? extends HiveOutputFormat> outputFormatClass;
-
+
/**
* @return the tPartition
*/
@@ -87,38 +85,44 @@
private Path partPath;
private URI partURI;
- public Partition(Table tbl, org.apache.hadoop.hive.metastore.api.Partition tp) throws HiveException {
+ public Partition(Table tbl, org.apache.hadoop.hive.metastore.api.Partition tp)
+ throws HiveException {
initialize(tbl, tp);
}
/**
* Create partition object with the given info.
- * @param tbl Table the partition will be in.
- * @param partSpec Partition specifications.
- * @param location Location of the partition, relative to the table.
- * @throws HiveException Thrown if we could not create the partition.
+ *
+ * @param tbl
+ * Table the partition will be in.
+ * @param partSpec
+ * Partition specifications.
+ * @param location
+ * Location of the partition, relative to the table.
+ * @throws HiveException
+ * Thrown if we could not create the partition.
*/
- public Partition(Table tbl, Map<String, String> partSpec,
- Path location) throws HiveException {
+ public Partition(Table tbl, Map<String, String> partSpec, Path location)
+ throws HiveException {
List<String> pvals = new ArrayList<String>();
for (FieldSchema field : tbl.getPartCols()) {
String val = partSpec.get(field.getName());
if (val == null) {
- throw new HiveException("partition spec is invalid. field.getName() does not exist in input.");
+ throw new HiveException(
+ "partition spec is invalid. field.getName() does not exist in input.");
}
pvals.add(val);
}
- org.apache.hadoop.hive.metastore.api.Partition tpart =
- new org.apache.hadoop.hive.metastore.api.Partition();
+ org.apache.hadoop.hive.metastore.api.Partition tpart = new org.apache.hadoop.hive.metastore.api.Partition();
tpart.setDbName(tbl.getDbName());
tpart.setTableName(tbl.getName());
tpart.setValues(pvals);
StorageDescriptor sd = new StorageDescriptor();
try {
- //replace with THRIFT-138
+ // replace with THRIFT-138
TMemoryBuffer buffer = new TMemoryBuffer(1024);
TBinaryProtocol prot = new TBinaryProtocol(buffer);
tbl.getTTable().getSd().write(prot);
@@ -141,22 +145,24 @@
/**
* Initializes this object with the given variables
- * @param tbl Table the partition belongs to
- * @param tp Thrift Partition object
- * @throws HiveException Thrown if we cannot initialize the partition
+ *
+ * @param tbl
+ * Table the partition belongs to
+ * @param tp
+ * Thrift Partition object
+ * @throws HiveException
+ * Thrown if we cannot initialize the partition
*/
private void initialize(Table tbl,
- org.apache.hadoop.hive.metastore.api.Partition tp)
- throws HiveException {
+ org.apache.hadoop.hive.metastore.api.Partition tp) throws HiveException {
table = tbl;
tPartition = tp;
partName = "";
- if(tbl.isPartitioned()) {
+ if (tbl.isPartitioned()) {
try {
- partName = Warehouse.makePartName(tbl.getPartCols(),
- tp.getValues());
+ partName = Warehouse.makePartName(tbl.getPartCols(), tp.getValues());
if (tp.getSd().getLocation() == null) {
// set default if location is not set
partPath = new Path(tbl.getDataLocation().toString(), partName);
@@ -187,10 +193,10 @@
return table;
}
- public Path [] getPath() {
- Path [] ret = new Path [1];
+ public Path[] getPath() {
+ Path[] ret = new Path[1];
ret[0] = partPath;
- return(ret);
+ return (ret);
}
public Path getPartitionPath() {
@@ -200,9 +206,9 @@
final public URI getDataLocation() {
return partURI;
}
-
+
final public Deserializer getDeserializer() {
- if(deserializer == null) {
+ if (deserializer == null) {
try {
initSerDe();
} catch (HiveException e) {
@@ -211,34 +217,38 @@
}
return deserializer;
}
-
+
/**
- * @param schema the schema to set
+ * @param schema
+ * the schema to set
*/
public void setSchema(Properties schema) {
this.schema = schema;
}
-
+
public Properties getSchema() {
- if(this.schema == null)
- this.schema = MetaStoreUtils.getSchema(this.getTPartition(), this.getTable().getTTable());
- return this.schema;
+ if (schema == null) {
+ schema = MetaStoreUtils
+ .getSchema(getTPartition(), getTable().getTTable());
+ }
+ return schema;
}
-
+
protected void initSerDe() throws HiveException {
if (deserializer == null) {
try {
- deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(), this.getTPartition(), this.getTable().getTTable());
+ deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(),
+ getTPartition(), getTable().getTTable());
} catch (MetaException e) {
throw new HiveException(e);
}
}
}
-
+
/**
* @param inputFormatClass
*/
- public void setInputFormatClass(Class<? extends InputFormat > inputFormatClass) {
+ public void setInputFormatClass(Class<? extends InputFormat> inputFormatClass) {
this.inputFormatClass = inputFormatClass;
tPartition.getSd().setInputFormat(inputFormatClass.getName());
}
@@ -247,78 +257,78 @@
* @param class1
*/
public void setOutputFormatClass(Class<?> class1) {
- this.outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(class1);
+ outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(class1);
tPartition.getSd().setOutputFormat(class1.getName());
}
- final public Class<? extends InputFormat> getInputFormatClass() throws HiveException{
- if(inputFormatClass == null) {
- String clsName = getSchema().getProperty(org.apache.hadoop.hive.metastore.api.Constants.FILE_INPUT_FORMAT,
+ final public Class<? extends InputFormat> getInputFormatClass()
+ throws HiveException {
+ if (inputFormatClass == null) {
+ String clsName = getSchema().getProperty(
+ org.apache.hadoop.hive.metastore.api.Constants.FILE_INPUT_FORMAT,
org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName());
- try{
- setInputFormatClass((Class<? extends InputFormat>)Class.forName(clsName, true, JavaUtils.getClassLoader()));
- } catch (ClassNotFoundException e) {
+ try {
+ setInputFormatClass((Class<? extends InputFormat>) Class.forName(
+ clsName, true, JavaUtils.getClassLoader()));
+ } catch (ClassNotFoundException e) {
throw new HiveException("Class not found: " + clsName, e);
}
- }
-
+ }
+
return inputFormatClass;
}
- final public Class<? extends HiveOutputFormat> getOutputFormatClass() throws HiveException {
- if (outputFormatClass == null) {
- String clsName = getSchema().getProperty(org.apache.hadoop.hive.metastore.api.Constants.FILE_OUTPUT_FORMAT,
- HiveSequenceFileOutputFormat.class.getName());
- try{
- setOutputFormatClass(Class.forName(clsName, true, JavaUtils.getClassLoader()));
- } catch (ClassNotFoundException e) {
+ final public Class<? extends HiveOutputFormat> getOutputFormatClass()
+ throws HiveException {
+ if (outputFormatClass == null) {
+ String clsName = getSchema().getProperty(
+ org.apache.hadoop.hive.metastore.api.Constants.FILE_OUTPUT_FORMAT,
+ HiveSequenceFileOutputFormat.class.getName());
+ try {
+ setOutputFormatClass(Class.forName(clsName, true, JavaUtils
+ .getClassLoader()));
+ } catch (ClassNotFoundException e) {
throw new HiveException("Class not found: " + clsName, e);
}
- }
+ }
return outputFormatClass;
}
-
+
/**
- * The number of buckets is a property of the partition. However - internally we are just
- * storing it as a property of the table as a short term measure.
+ * The number of buckets is a property of the partition. However - internally
+ * we are just storing it as a property of the table as a short term measure.
*/
public int getBucketCount() {
return table.getNumBuckets();
/*
- TODO: Keeping this code around for later use when we will support
- sampling on tables which are not created with CLUSTERED INTO clause
-
- // read from table meta data
- int numBuckets = this.table.getNumBuckets();
- if (numBuckets == -1) {
- // table meta data does not have bucket information
- // check if file system has multiple buckets(files) in this partition
- String pathPattern = this.partPath.toString() + "/*";
- try {
- FileSystem fs = FileSystem.get(this.table.getDataLocation(), Hive.get().getConf());
- FileStatus srcs[] = fs.globStatus(new Path(pathPattern));
- numBuckets = srcs.length;
- }
- catch (Exception e) {
- throw new RuntimeException("Cannot get bucket count for table " + this.table.getName(), e);
- }
- }
- return numBuckets;
+ * TODO: Keeping this code around for later use when we will support
+ * sampling on tables which are not created with CLUSTERED INTO clause
+ *
+ * // read from table meta data int numBuckets = this.table.getNumBuckets();
+ * if (numBuckets == -1) { // table meta data does not have bucket
+ * information // check if file system has multiple buckets(files) in this
+ * partition String pathPattern = this.partPath.toString() + "/*"; try {
+ * FileSystem fs = FileSystem.get(this.table.getDataLocation(),
+ * Hive.get().getConf()); FileStatus srcs[] = fs.globStatus(new
+ * Path(pathPattern)); numBuckets = srcs.length; } catch (Exception e) {
+ * throw new RuntimeException("Cannot get bucket count for table " +
+ * this.table.getName(), e); } } return numBuckets;
*/
}
public List<String> getBucketCols() {
- return this.tPartition.getSd().getBucketCols();
+ return tPartition.getSd().getBucketCols();
}
/**
* mapping from bucket number to bucket path
*/
- //TODO: add test case and clean it up
+ // TODO: add test case and clean it up
@SuppressWarnings("nls")
public Path getBucketPath(int bucketNum) {
try {
- FileSystem fs = FileSystem.get(table.getDataLocation(), Hive.get().getConf());
+ FileSystem fs = FileSystem.get(table.getDataLocation(), Hive.get()
+ .getConf());
String pathPattern = partPath.toString();
if (getBucketCount() > 0) {
pathPattern = pathPattern + "/*";
@@ -326,79 +336,84 @@
LOG.info("Path pattern = " + pathPattern);
FileStatus srcs[] = fs.globStatus(new Path(pathPattern));
Arrays.sort(srcs);
- for (FileStatus src: srcs) {
+ for (FileStatus src : srcs) {
LOG.info("Got file: " + src.getPath());
}
- if(srcs.length == 0)
+ if (srcs.length == 0) {
return null;
+ }
return srcs[bucketNum].getPath();
- }
- catch (Exception e) {
- throw new RuntimeException("Cannot get bucket path for bucket " + bucketNum, e);
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot get bucket path for bucket "
+ + bucketNum, e);
}
}
/**
* mapping from a Path to the bucket number if any
*/
- private static Pattern bpattern = Pattern.compile("part-([0-9][0-9][0-9][0-9][0-9])");
+ private static Pattern bpattern = Pattern
+ .compile("part-([0-9][0-9][0-9][0-9][0-9])");
private String partName;
+
@SuppressWarnings("nls")
public static int getBucketNum(Path p) {
Matcher m = bpattern.matcher(p.getName());
- if(m.find()) {
+ if (m.find()) {
String bnum_str = m.group(1);
try {
return (Integer.parseInt(bnum_str));
} catch (NumberFormatException e) {
- throw new RuntimeException("Unexpected error parsing: "+p.getName()+","+bnum_str);
+ throw new RuntimeException("Unexpected error parsing: " + p.getName()
+ + "," + bnum_str);
}
}
return 0;
}
-
@SuppressWarnings("nls")
- public Path [] getPath(Sample s) throws HiveException {
- if(s == null) {
+ public Path[] getPath(Sample s) throws HiveException {
+ if (s == null) {
return getPath();
} else {
int bcount = getBucketCount();
- if(bcount == 0) {
+ if (bcount == 0) {
return getPath();
}
Dimension d = s.getSampleDimension();
- if(!d.getDimensionId().equals(table.getBucketingDimensionId())) {
+ if (!d.getDimensionId().equals(table.getBucketingDimensionId())) {
// if the bucket dimension is not the same as the sampling dimension
// we must scan all the data
return getPath();
}
int scount = s.getSampleFraction();
- ArrayList<Path> ret = new ArrayList<Path> ();
+ ArrayList<Path> ret = new ArrayList<Path>();
- if(bcount == scount) {
- ret.add(getBucketPath(s.getSampleNum()-1));
+ if (bcount == scount) {
+ ret.add(getBucketPath(s.getSampleNum() - 1));
} else if (bcount < scount) {
- if((scount/bcount)*bcount != scount) {
- throw new HiveException("Sample Count"+scount+" is not a multiple of bucket count " +
- bcount + " for table " + table.getName());
+ if ((scount / bcount) * bcount != scount) {
+ throw new HiveException("Sample Count" + scount
+ + " is not a multiple of bucket count " + bcount + " for table "
+ + table.getName());
}
// undersampling a bucket
- ret.add(getBucketPath((s.getSampleNum()-1)%bcount));
+ ret.add(getBucketPath((s.getSampleNum() - 1) % bcount));
} else if (bcount > scount) {
- if((bcount/scount)*scount != bcount) {
- throw new HiveException("Sample Count"+scount+" is not a divisor of bucket count " +
- bcount + " for table " + table.getName());
+ if ((bcount / scount) * scount != bcount) {
+ throw new HiveException("Sample Count" + scount
+ + " is not a divisor of bucket count " + bcount + " for table "
+ + table.getName());
}
// sampling multiple buckets
- for(int i=0; i<bcount/scount; i++) {
- ret.add(getBucketPath(i*scount + (s.getSampleNum()-1)));
+ for (int i = 0; i < bcount / scount; i++) {
+ ret.add(getBucketPath(i * scount + (s.getSampleNum() - 1)));
}
}
- return(ret.toArray(new Path[ret.size()]));
+ return (ret.toArray(new Path[ret.size()]));
}
}
@@ -406,7 +421,6 @@
return spec;
}
-
@SuppressWarnings("nls")
@Override
public String toString() {
@@ -414,7 +428,8 @@
try {
pn = Warehouse.makePartName(spec);
} catch (MetaException e) {
- // ignore as we most probably in an exception path already otherwise this error wouldn't occur
+ // ignore as we most probably in an exception path already otherwise this
+ // error wouldn't occur
}
return table.toString() + "(" + pn + ")";
}
@@ -425,12 +440,13 @@
/**
* getProperty
- *
+ *
*/
public String getProperty(String name) {
- Map<String,String> params = getTPartition().getParameters();
- if (params == null)
+ Map<String, String> params = getTPartition().getParameters();
+ if (params == null) {
return null;
+ }
return params.get(name);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/RandomDimension.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/RandomDimension.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/RandomDimension.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/RandomDimension.java Thu Jan 21 10:37:58 2010
@@ -21,18 +21,21 @@
import java.util.Random;
/**
- * A random dimension is an abstract dimension.
- * It is implicitly associated with every row in data and has a random value
- *
+ * A random dimension is an abstract dimension. It is implicitly associated with
+ * every row in data and has a random value
+ *
**/
public class RandomDimension extends Dimension {
- Random r;
+ Random r;
- public RandomDimension(Class t, String id) {
- super(t, id);
- r = new Random();
- }
+ public RandomDimension(Class t, String id) {
+ super(t, id);
+ r = new Random();
+ }
- public int hashCode(Object o) { return r.nextInt(); }
+ @Override
+ public int hashCode(Object o) {
+ return r.nextInt();
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Sample.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Sample.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Sample.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Sample.java Thu Jan 21 10:37:58 2010
@@ -24,58 +24,72 @@
**/
public class Sample {
- protected int sampleNum;
- protected int sampleFraction;
- protected Dimension sampleDimension;
- protected int moduloNum;
-
- @SuppressWarnings("nls")
- public Sample(int num, int fraction, Dimension d) throws HiveException {
- if((num <= 0) || (num > fraction)) {
- throw new HiveException("Bad sample spec: " + num + "/" + fraction);
- }
- this.sampleNum = num;
- this.moduloNum = this.sampleNum-1;
- this.sampleFraction = fraction;
- this.sampleDimension = d;
+ protected int sampleNum;
+ protected int sampleFraction;
+ protected Dimension sampleDimension;
+ protected int moduloNum;
+
+ @SuppressWarnings("nls")
+ public Sample(int num, int fraction, Dimension d) throws HiveException {
+ if ((num <= 0) || (num > fraction)) {
+ throw new HiveException("Bad sample spec: " + num + "/" + fraction);
}
-
- /**
- * Given an arbitrary object, determine if it falls within this sample.
- */
- public boolean inSample(Object o) {
- return (((this.sampleDimension.hashCode(o) & Integer.MAX_VALUE) % this.sampleFraction) == this.moduloNum);
+ sampleNum = num;
+ moduloNum = sampleNum - 1;
+ sampleFraction = fraction;
+ sampleDimension = d;
+ }
+
+ /**
+ * Given an arbitrary object, determine if it falls within this sample.
+ */
+ public boolean inSample(Object o) {
+ return (((sampleDimension.hashCode(o) & Integer.MAX_VALUE) % sampleFraction) == moduloNum);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
}
-
- @Override
- public boolean equals (Object o) {
- if (this == o)
- return true;
- if (o == null)
- return false;
- if(o instanceof Sample) {
- Sample s = (Sample)o;
- return ((this.sampleNum == s.sampleNum) && (this.sampleFraction == s.sampleFraction) &&
- this.sampleDimension.equals(s.sampleDimension));
- }
- return (false);
+ if (o == null) {
+ return false;
}
-
- public int getSampleNum() { return this.sampleNum;}
- public int getSampleFraction() { return this.sampleFraction;}
- public Dimension getSampleDimension() { return this.sampleDimension;}
-
- @SuppressWarnings("nls")
- @Override
- public String toString() { return this.sampleNum+"/"+this.sampleFraction+"@("+this.sampleDimension+")";}
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((this.sampleDimension == null) ? 0 : this.sampleDimension.hashCode());
- result = prime * result + this.sampleFraction;
- result = prime * result + this.sampleNum;
- return result;
+ if (o instanceof Sample) {
+ Sample s = (Sample) o;
+ return ((sampleNum == s.sampleNum)
+ && (sampleFraction == s.sampleFraction) && sampleDimension
+ .equals(s.sampleDimension));
}
+ return (false);
+ }
+
+ public int getSampleNum() {
+ return sampleNum;
+ }
+
+ public int getSampleFraction() {
+ return sampleFraction;
+ }
+
+ public Dimension getSampleDimension() {
+ return sampleDimension;
+ }
+
+ @SuppressWarnings("nls")
+ @Override
+ public String toString() {
+ return sampleNum + "/" + sampleFraction + "@(" + sampleDimension + ")";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result
+ + ((sampleDimension == null) ? 0 : sampleDimension.hashCode());
+ result = prime * result + sampleFraction;
+ result = prime * result + sampleNum;
+ return result;
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Thu Jan 21 10:37:58 2010
@@ -33,6 +33,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -48,14 +49,13 @@
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
-
/**
- * A Hive Table: is a fundamental unit of data in Hive that shares a common schema/DDL
+ * A Hive Table: is a fundamental unit of data in Hive that shares a common
+ * schema/DDL
*/
public class Table {
@@ -70,36 +70,49 @@
/**
* Table (only used internally)
+ *
* @throws HiveException
- *
+ *
*/
protected Table() throws HiveException {
}
/**
* Table
- *
- * Create a TableMetaInfo object presumably with the intent of saving it to the metastore
- *
- * @param name the name of this table in the metadb
- * @param schema an object that represents the schema that this SerDe must know
- * @param deserializer a Class to be used for deserializing the data
- * @param dataLocation where is the table ? (e.g., dfs://hadoop001.sf2p.facebook.com:9000/user/facebook/warehouse/example) NOTE: should not be hardcoding this, but ok for now
- *
- * @exception HiveException on internal error. Note not possible now, but in the future reserve the right to throw an exception
+ *
+ * Create a TableMetaInfo object presumably with the intent of saving it to
+ * the metastore
+ *
+ * @param name
+ * the name of this table in the metadb
+ * @param schema
+ * an object that represents the schema that this SerDe must know
+ * @param deserializer
+ * a Class to be used for deserializing the data
+ * @param dataLocation
+ * where is the table ? (e.g.,
+ * dfs://hadoop001.sf2p.facebook.com:9000/
+ * user/facebook/warehouse/example) NOTE: should not be hardcoding
+ * this, but ok for now
+ *
+ * @exception HiveException
+ * on internal error. Note not possible now, but in the future
+ * reserve the right to throw an exception
*/
public Table(String name, Properties schema, Deserializer deserializer,
Class<? extends InputFormat<?, ?>> inputFormatClass,
- Class<?> outputFormatClass,
- URI dataLocation, Hive hive) throws HiveException {
+ Class<?> outputFormatClass, URI dataLocation, Hive hive)
+ throws HiveException {
initEmpty();
this.schema = schema;
- this.deserializer = deserializer; //TODO: convert to SerDeInfo format
- this.getTTable().getSd().getSerdeInfo().setSerializationLib(deserializer.getClass().getName());
+ this.deserializer = deserializer; // TODO: convert to SerDeInfo format
+ getTTable().getSd().getSerdeInfo().setSerializationLib(
+ deserializer.getClass().getName());
getTTable().setTableName(name);
getSerdeInfo().setSerializationLib(deserializer.getClass().getName());
setInputFormatClass(inputFormatClass);
- setOutputFormatClass(HiveFileFormatUtils.getOutputFormatSubstitute(outputFormatClass));
+ setOutputFormatClass(HiveFileFormatUtils
+ .getOutputFormatSubstitute(outputFormatClass));
setDataLocation(dataLocation);
}
@@ -108,9 +121,11 @@
initEmpty();
getTTable().setTableName(name);
getTTable().setDbName(MetaStoreUtils.DEFAULT_DATABASE_NAME);
- // We have to use MetadataTypedColumnsetSerDe because LazySimpleSerDe does not
+ // We have to use MetadataTypedColumnsetSerDe because LazySimpleSerDe does
+ // not
// support a table with no columns.
- getSerdeInfo().setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());
+ getSerdeInfo().setSerializationLib(
+ MetadataTypedColumnsetSerDe.class.getName());
getSerdeInfo().getParameters().put(Constants.SERIALIZATION_FORMAT, "1");
}
@@ -133,7 +148,8 @@
public void reinitSerDe() throws HiveException {
try {
- deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(), this.getTTable());
+ deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(),
+ getTTable());
} catch (MetaException e) {
throw new HiveException(e);
}
@@ -142,7 +158,8 @@
protected void initSerDe() throws HiveException {
if (deserializer == null) {
try {
- deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(), this.getTTable());
+ deserializer = MetaStoreUtils.getDeserializer(Hive.get().getConf(),
+ getTTable());
} catch (MetaException e) {
throw new HiveException(e);
}
@@ -152,11 +169,13 @@
public void checkValidity() throws HiveException {
// check for validity
String name = getTTable().getTableName();
- if (null == name || name.length() == 0 || !MetaStoreUtils.validateName(name)) {
+ if (null == name || name.length() == 0
+ || !MetaStoreUtils.validateName(name)) {
throw new HiveException("[" + name + "]: is not a valid table name");
}
if (0 == getCols().size()) {
- throw new HiveException("at least one column must be specified for the table");
+ throw new HiveException(
+ "at least one column must be specified for the table");
}
if (!isView()) {
if (null == getDeserializer()) {
@@ -177,20 +196,23 @@
Iterator<String> iter = colNames.iterator();
while (iter.hasNext()) {
String oldColName = iter.next();
- if (colName.equalsIgnoreCase(oldColName))
- throw new HiveException("Duplicate column name " + colName + " in the table definition.");
+ if (colName.equalsIgnoreCase(oldColName)) {
+ throw new HiveException("Duplicate column name " + colName
+ + " in the table definition.");
+ }
}
colNames.add(colName.toLowerCase());
}
- if (getPartCols() != null)
- {
+ if (getPartCols() != null) {
// there is no overlap between columns and partitioning columns
Iterator<FieldSchema> partColsIter = getPartCols().iterator();
while (partColsIter.hasNext()) {
String partCol = partColsIter.next().getName();
- if(colNames.contains(partCol.toLowerCase()))
- throw new HiveException("Partition column name " + partCol + " conflicts with table columns.");
+ if (colNames.contains(partCol.toLowerCase())) {
+ throw new HiveException("Partition column name " + partCol
+ + " conflicts with table columns.");
+ }
}
}
return;
@@ -208,11 +230,11 @@
* @param class1
*/
public void setOutputFormatClass(Class<?> class1) {
- this.outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(class1);
+ outputFormatClass = HiveFileFormatUtils.getOutputFormatSubstitute(class1);
tTable.getSd().setOutputFormat(class1.getName());
}
- final public Properties getSchema() {
+ final public Properties getSchema() {
return schema;
}
@@ -229,7 +251,7 @@
}
final public Deserializer getDeserializer() {
- if(deserializer == null) {
+ if (deserializer == null) {
try {
initSerDe();
} catch (HiveException e) {
@@ -247,24 +269,30 @@
return outputFormatClass;
}
- final public boolean isValidSpec(Map<String, String> spec) throws HiveException {
+ final public boolean isValidSpec(Map<String, String> spec)
+ throws HiveException {
// TODO - types need to be checked.
List<FieldSchema> partCols = getTTable().getPartitionKeys();
- if(partCols== null || (partCols.size() == 0)) {
- if (spec != null)
- throw new HiveException("table is not partitioned but partition spec exists: " + spec);
- else
+ if (partCols == null || (partCols.size() == 0)) {
+ if (spec != null) {
+ throw new HiveException(
+ "table is not partitioned but partition spec exists: " + spec);
+ } else {
return true;
+ }
}
- if((spec == null) || (spec.size() != partCols.size())) {
- throw new HiveException("table is partitioned but partition spec is not specified or tab: " + spec);
+ if ((spec == null) || (spec.size() != partCols.size())) {
+ throw new HiveException(
+ "table is partitioned but partition spec is not specified or tab: "
+ + spec);
}
for (FieldSchema field : partCols) {
- if(spec.get(field.getName()) == null) {
- throw new HiveException(field.getName() + " not found in table's partition spec: " + spec);
+ if (spec.get(field.getName()) == null) {
+ throw new HiveException(field.getName()
+ + " not found in table's partition spec: " + spec);
}
}
@@ -277,7 +305,7 @@
/**
* getProperty
- *
+ *
*/
public String getProperty(String name) {
return getTTable().getParameters().get(name);
@@ -285,14 +313,16 @@
public Vector<StructField> getFields() {
- Vector<StructField> fields = new Vector<StructField> ();
+ Vector<StructField> fields = new Vector<StructField>();
try {
Deserializer decoder = getDeserializer();
// Expand out all the columns of the table
- StructObjectInspector structObjectInspector = (StructObjectInspector)decoder.getObjectInspector();
- List<? extends StructField> fld_lst = structObjectInspector.getAllStructFieldRefs();
- for(StructField field: fld_lst) {
+ StructObjectInspector structObjectInspector = (StructObjectInspector) decoder
+ .getObjectInspector();
+ List<? extends StructField> fld_lst = structObjectInspector
+ .getAllStructFieldRefs();
+ for (StructField field : fld_lst) {
fields.add(field);
}
} catch (SerDeException e) {
@@ -303,35 +333,38 @@
public StructField getField(String fld) {
try {
- StructObjectInspector structObjectInspector = (StructObjectInspector)getDeserializer().getObjectInspector();
+ StructObjectInspector structObjectInspector = (StructObjectInspector) getDeserializer()
+ .getObjectInspector();
return structObjectInspector.getStructFieldRef(fld);
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
- * @param schema the schema to set
+ * @param schema
+ * the schema to set
*/
public void setSchema(Properties schema) {
this.schema = schema;
}
/**
- * @param deserializer the deserializer to set
+ * @param deserializer
+ * the deserializer to set
*/
public void setDeserializer(Deserializer deserializer) {
this.deserializer = deserializer;
}
+ @Override
public String toString() {
return getTTable().getTableName();
}
public List<FieldSchema> getPartCols() {
List<FieldSchema> partKeys = getTTable().getPartitionKeys();
- if(partKeys == null) {
+ if (partKeys == null) {
partKeys = new ArrayList<FieldSchema>();
getTTable().setPartitionKeys(partKeys);
}
@@ -340,22 +373,23 @@
public boolean isPartitionKey(String colName) {
for (FieldSchema key : getPartCols()) {
- if(key.getName().toLowerCase().equals(colName)) {
+ if (key.getName().toLowerCase().equals(colName)) {
return true;
}
}
return false;
}
- //TODO merge this with getBucketCols function
+ // TODO merge this with getBucketCols function
public String getBucketingDimensionId() {
List<String> bcols = getTTable().getSd().getBucketCols();
- if(bcols == null || bcols.size() == 0) {
+ if (bcols == null || bcols.size() == 0) {
return null;
}
- if(bcols.size() > 1) {
- LOG.warn(this + " table has more than one dimensions which aren't supported yet");
+ if (bcols.size() > 1) {
+ LOG.warn(this
+ + " table has more than one dimensions which aren't supported yet");
}
return bcols.get(0);
@@ -369,7 +403,8 @@
}
/**
- * @param table the tTable to set
+ * @param table
+ * the tTable to set
*/
protected void setTTable(org.apache.hadoop.hive.metastore.api.Table table) {
tTable = table;
@@ -386,8 +421,10 @@
}
for (String col : bucketCols) {
- if(!isField(col))
- throw new HiveException("Bucket columns " + col + " is not part of the table columns" );
+ if (!isField(col)) {
+ throw new HiveException("Bucket columns " + col
+ + " is not part of the table columns");
+ }
}
getTTable().getSd().setBucketCols(bucketCols);
}
@@ -398,7 +435,7 @@
private boolean isField(String col) {
for (FieldSchema field : getCols()) {
- if(field.getName().equals(col)) {
+ if (field.getName().equals(col)) {
return true;
}
}
@@ -407,29 +444,33 @@
public List<FieldSchema> getCols() {
boolean isNative = SerDeUtils.isNativeSerDe(getSerializationLib());
- if (isNative)
+ if (isNative) {
return getTTable().getSd().getCols();
- else {
+ } else {
try {
return Hive.getFieldsFromDeserializer(getName(), getDeserializer());
} catch (HiveException e) {
- LOG.error("Unable to get field from serde: " + getSerializationLib(), e);
+ LOG
+ .error("Unable to get field from serde: " + getSerializationLib(),
+ e);
}
return new ArrayList<FieldSchema>();
}
}
/**
- * Returns a list of all the columns of the table (data columns + partition columns in that order.
- *
+ * Returns a list of all the columns of the table (data columns + partition
+ * columns in that order.
+ *
* @return List<FieldSchema>
*/
public List<FieldSchema> getAllCols() {
- ArrayList<FieldSchema> f_list = new ArrayList<FieldSchema>();
- f_list.addAll(getPartCols());
- f_list.addAll(getCols());
- return f_list;
+ ArrayList<FieldSchema> f_list = new ArrayList<FieldSchema>();
+ f_list.addAll(getPartCols());
+ f_list.addAll(getCols());
+ return f_list;
}
+
public void setPartCols(List<FieldSchema> partCols) {
getTTable().setPartitionKeys(partCols);
}
@@ -443,9 +484,13 @@
}
/**
- * Replaces files in the partition with new data set specified by srcf. Works by moving files
- * @param srcf Files to be replaced. Leaf directories or globbed file paths
- * @param tmpd Temporary directory
+ * Replaces files in the partition with new data set specified by srcf. Works
+ * by moving files
+ *
+ * @param srcf
+ * Files to be replaced. Leaf directories or globbed file paths
+ * @param tmpd
+ * Temporary directory
*/
protected void replaceFiles(Path srcf, Path tmpd) throws HiveException {
FileSystem fs;
@@ -459,7 +504,9 @@
/**
* Inserts files specified into the partition. Works by moving files
- * @param srcf Files to be moved. Leaf directories or globbed file paths
+ *
+ * @param srcf
+ * Files to be moved. Leaf directories or globbed file paths
*/
protected void copyFiles(Path srcf) throws HiveException {
FileSystem fs;
@@ -473,8 +520,8 @@
public void setInputFormatClass(String name) throws HiveException {
try {
- setInputFormatClass((Class<? extends InputFormat<WritableComparable, Writable>>)
- Class.forName(name, true, JavaUtils.getClassLoader()));
+ setInputFormatClass((Class<? extends InputFormat<WritableComparable, Writable>>) Class
+ .forName(name, true, JavaUtils.getClassLoader()));
} catch (ClassNotFoundException e) {
throw new HiveException("Class not found: " + name, e);
}
@@ -483,15 +530,15 @@
public void setOutputFormatClass(String name) throws HiveException {
try {
Class<?> origin = Class.forName(name, true, JavaUtils.getClassLoader());
- setOutputFormatClass(HiveFileFormatUtils.getOutputFormatSubstitute(origin));
+ setOutputFormatClass(HiveFileFormatUtils
+ .getOutputFormatSubstitute(origin));
} catch (ClassNotFoundException e) {
throw new HiveException("Class not found: " + name, e);
}
}
-
public boolean isPartitioned() {
- if(getPartCols() == null) {
+ if (getPartCols() == null) {
return false;
}
return (getPartCols().size() != 0);
@@ -581,7 +628,8 @@
}
/**
- * @param viewOriginalText the original view text to set
+ * @param viewOriginalText
+ * the original view text to set
*/
public void setViewOriginalText(String viewOriginalText) {
getTTable().setViewOriginalText(viewOriginalText);
@@ -595,7 +643,8 @@
}
/**
- * @param viewExpandedText the expanded view text to set
+ * @param viewExpandedText
+ * the expanded view text to set
*/
public void setViewExpandedText(String viewExpandedText) {
getTTable().setViewExpandedText(viewExpandedText);
@@ -609,13 +658,15 @@
// be set, or neither
boolean hasExpandedText = (getViewExpandedText() != null);
boolean hasOriginalText = (getViewOriginalText() != null);
- assert(hasExpandedText == hasOriginalText);
+ assert (hasExpandedText == hasOriginalText);
return hasExpandedText;
}
/**
* Creates a partition name -> value spec map object
- * @param tp Use the information from this partition.
+ *
+ * @param tp
+ * Use the information from this partition.
* @return Partition name to value mapping.
*/
public LinkedHashMap<String, String> createSpec(
@@ -635,13 +686,13 @@
public Table copy() throws HiveException {
Table newTbl = new Table();
- newTbl.schema = this.schema;
- newTbl.deserializer = this.deserializer; //TODO: convert to SerDeInfo format
+ newTbl.schema = schema;
+ newTbl.deserializer = deserializer; // TODO: convert to SerDeInfo format
newTbl.setTTable(getTTable().clone());
- newTbl.uri = this.uri;
- newTbl.inputFormatClass = this.inputFormatClass;
- newTbl.outputFormatClass = this.outputFormatClass;
+ newTbl.uri = uri;
+ newTbl.inputFormatClass = inputFormatClass;
+ newTbl.outputFormatClass = outputFormatClass;
return newTbl;
}
};
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPruner.java Thu Jan 21 10:37:58 2010
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ScriptOperator;
@@ -30,28 +31,27 @@
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
-import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
- * Implementation of one of the rule-based optimization steps. ColumnPruner gets the current operator tree. The \
- * tree is traversed to find out the columns used
- * for all the base tables. If all the columns for a table are not used, a select is pushed on top of that table
- * (to select only those columns). Since this
- * changes the row resolver, the tree is built again. This can be optimized later to patch the tree.
+ * Implementation of one of the rule-based optimization steps. ColumnPruner gets
+ * the current operator tree. The \ tree is traversed to find out the columns
+ * used for all the base tables. If all the columns for a table are not used, a
+ * select is pushed on top of that table (to select only those columns). Since
+ * this changes the row resolver, the tree is built again. This can be optimized
+ * later to patch the tree.
*/
public class ColumnPruner implements Transform {
protected ParseContext pGraphContext;
private HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap;
-
/**
* empty constructor
*/
@@ -60,54 +60,55 @@
}
/**
- * update the map between operator and row resolver
- * @param op operator being inserted
- * @param rr row resolver of the operator
- * @return
- */
- @SuppressWarnings("nls")
- private Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op, RowResolver rr) {
- OpParseContext ctx = new OpParseContext(rr);
- pGraphContext.getOpParseCtx().put(op, ctx);
- return op;
- }
-
- /**
- * Transform the query tree. For each table under consideration, check if all columns are needed. If not,
- * only select the operators needed at the beginning and proceed
- * @param pactx the current parse context
+ * Transform the query tree. For each table under consideration, check if all
+ * columns are needed. If not, only select the operators needed at the
+ * beginning and proceed
+ *
+ * @param pactx
+ * the current parse context
*/
public ParseContext transform(ParseContext pactx) throws SemanticException {
- this.pGraphContext = pactx;
- this.opToParseCtxMap = pGraphContext.getOpParseCtx();
+ pGraphContext = pactx;
+ opToParseCtxMap = pGraphContext.getOpParseCtx();
// generate pruned column list for all relevant operators
ColumnPrunerProcCtx cppCtx = new ColumnPrunerProcCtx(opToParseCtxMap);
-
- // create a walker which walks the tree in a DFS manner while maintaining the operator stack. The dispatcher
+
+ // create a walker which walks the tree in a DFS manner while maintaining
+ // the operator stack. The dispatcher
// generates the plan from the operator tree
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp("R1", "FIL%"), ColumnPrunerProcFactory.getFilterProc());
- opRules.put(new RuleRegExp("R2", "GBY%"), ColumnPrunerProcFactory.getGroupByProc());
- opRules.put(new RuleRegExp("R3", "RS%"), ColumnPrunerProcFactory.getReduceSinkProc());
- opRules.put(new RuleRegExp("R4", "SEL%"), ColumnPrunerProcFactory.getSelectProc());
- opRules.put(new RuleRegExp("R5", "JOIN%"), ColumnPrunerProcFactory.getJoinProc());
- opRules.put(new RuleRegExp("R6", "MAPJOIN%"), ColumnPrunerProcFactory.getMapJoinProc());
- opRules.put(new RuleRegExp("R7", "TS%"), ColumnPrunerProcFactory.getTableScanProc());
-
- // The dispatcher fires the processor corresponding to the closest matching rule and passes the context along
- Dispatcher disp = new DefaultRuleDispatcher(ColumnPrunerProcFactory.getDefaultProc(), opRules, cppCtx);
+ opRules.put(new RuleRegExp("R1", "FIL%"), ColumnPrunerProcFactory
+ .getFilterProc());
+ opRules.put(new RuleRegExp("R2", "GBY%"), ColumnPrunerProcFactory
+ .getGroupByProc());
+ opRules.put(new RuleRegExp("R3", "RS%"), ColumnPrunerProcFactory
+ .getReduceSinkProc());
+ opRules.put(new RuleRegExp("R4", "SEL%"), ColumnPrunerProcFactory
+ .getSelectProc());
+ opRules.put(new RuleRegExp("R5", "JOIN%"), ColumnPrunerProcFactory
+ .getJoinProc());
+ opRules.put(new RuleRegExp("R6", "MAPJOIN%"), ColumnPrunerProcFactory
+ .getMapJoinProc());
+ opRules.put(new RuleRegExp("R7", "TS%"), ColumnPrunerProcFactory
+ .getTableScanProc());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(ColumnPrunerProcFactory
+ .getDefaultProc(), opRules, cppCtx);
GraphWalker ogw = new ColumnPrunerWalker(disp);
-
+
// Create a list of topop nodes
ArrayList<Node> topNodes = new ArrayList<Node>();
topNodes.addAll(pGraphContext.getTopOps().values());
ogw.startWalking(topNodes, null);
return pGraphContext;
}
-
+
/**
- * Walks the op tree in post order fashion (skips selects with file sink or script op children)
+ * Walks the op tree in post order fashion (skips selects with file sink or
+ * script op children)
*/
public static class ColumnPrunerWalker extends DefaultGraphWalker {
@@ -123,18 +124,20 @@
boolean walkChildren = true;
opStack.push(nd);
- // no need to go further down for a select op with a file sink or script child
+ // no need to go further down for a select op with a file sink or script
+ // child
// since all cols are needed for these ops
- if(nd instanceof SelectOperator) {
- for(Node child: nd.getChildren()) {
- if ((child instanceof FileSinkOperator) || (child instanceof ScriptOperator))
+ if (nd instanceof SelectOperator) {
+ for (Node child : nd.getChildren()) {
+ if ((child instanceof FileSinkOperator)
+ || (child instanceof ScriptOperator)) {
walkChildren = false;
+ }
}
}
- if((nd.getChildren() == null)
- || getDispatchedList().containsAll(nd.getChildren())
- || !walkChildren) {
+ if ((nd.getChildren() == null)
+ || getDispatchedList().containsAll(nd.getChildren()) || !walkChildren) {
// all children are done or no need to walk the children
dispatch(nd, opStack);
opStack.pop();
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcCtx.java Thu Jan 21 10:37:58 2010
@@ -25,8 +25,6 @@
import java.util.Map;
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -40,24 +38,24 @@
* This class implements the processor context for Column Pruner.
*/
public class ColumnPrunerProcCtx implements NodeProcessorCtx {
-
- private Map<Operator<? extends Serializable>,List<String>> prunedColLists;
-
- private HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap;
-
- private Map<CommonJoinOperator,Map<Byte,List<String>>> joinPrunedColLists;
-
- public ColumnPrunerProcCtx(HashMap<Operator<? extends Serializable>, OpParseContext> opToParseContextMap) {
+ private final Map<Operator<? extends Serializable>, List<String>> prunedColLists;
+
+ private final HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap;
+
+ private final Map<CommonJoinOperator, Map<Byte, List<String>>> joinPrunedColLists;
+
+ public ColumnPrunerProcCtx(
+ HashMap<Operator<? extends Serializable>, OpParseContext> opToParseContextMap) {
prunedColLists = new HashMap<Operator<? extends Serializable>, List<String>>();
- this.opToParseCtxMap = opToParseContextMap;
- joinPrunedColLists = new HashMap<CommonJoinOperator,Map<Byte,List<String>>>();
+ opToParseCtxMap = opToParseContextMap;
+ joinPrunedColLists = new HashMap<CommonJoinOperator, Map<Byte, List<String>>>();
}
public Map<CommonJoinOperator, Map<Byte, List<String>>> getJoinPrunedColLists() {
return joinPrunedColLists;
}
-
+
/**
* @return the prunedColLists
*/
@@ -68,30 +66,31 @@
public HashMap<Operator<? extends Serializable>, OpParseContext> getOpToParseCtxMap() {
return opToParseCtxMap;
}
-
+
public Map<Operator<? extends Serializable>, List<String>> getPrunedColLists() {
return prunedColLists;
}
-
+
/**
- * Creates the list of internal column names(these names are used in the RowResolver and
- * are different from the external column names) that are needed in the subtree. These columns
- * eventually have to be selected from the table scan.
+ * Creates the list of internal column names(these names are used in the
+ * RowResolver and are different from the external column names) that are
+ * needed in the subtree. These columns eventually have to be selected from
+ * the table scan.
*
- * @param curOp The root of the operator subtree.
+ * @param curOp
+ * The root of the operator subtree.
* @return List<String> of the internal column names.
* @throws SemanticException
*/
- public List<String> genColLists(Operator<? extends Serializable> curOp) throws SemanticException {
+ public List<String> genColLists(Operator<? extends Serializable> curOp)
+ throws SemanticException {
List<String> colList = new ArrayList<String>();
- if(curOp.getChildOperators() != null) {
+ if (curOp.getChildOperators() != null) {
for (Operator<? extends Serializable> child : curOp.getChildOperators()) {
if (child instanceof CommonJoinOperator) {
int tag = child.getParentOperators().indexOf(curOp);
- List<String> prunList = joinPrunedColLists.get((CommonJoinOperator) child).get(
- (byte) tag);
- colList = Utilities
- .mergeUniqElems(colList, prunList);
+ List<String> prunList = joinPrunedColLists.get(child).get((byte) tag);
+ colList = Utilities.mergeUniqElems(colList, prunList);
} else {
colList = Utilities
.mergeUniqElems(colList, prunedColLists.get(child));
@@ -100,52 +99,60 @@
}
return colList;
}
-
+
/**
- * Creates the list of internal column names from select expressions in a select operator.
- * This function is used for the select operator instead of the genColLists function (which is
- * used by the rest of the operators).
+ * Creates the list of internal column names from select expressions in a
+ * select operator. This function is used for the select operator instead of
+ * the genColLists function (which is used by the rest of the operators).
*
- * @param op The select operator.
+ * @param op
+ * The select operator.
* @return List<String> of the internal column names.
*/
public List<String> getColsFromSelectExpr(SelectOperator op) {
List<String> cols = new ArrayList<String>();
selectDesc conf = op.getConf();
ArrayList<exprNodeDesc> exprList = conf.getColList();
- for (exprNodeDesc expr : exprList)
+ for (exprNodeDesc expr : exprList) {
cols = Utilities.mergeUniqElems(cols, expr.getCols());
+ }
return cols;
}
/**
* Creates the list of internal column names for select * expressions.
*
- * @param op The select operator.
- * @param colList The list of internal column names returned by the children of the select operator.
+ * @param op
+ * The select operator.
+ * @param colList
+ * The list of internal column names returned by the children of the
+ * select operator.
* @return List<String> of the internal column names.
*/
- public List<String> getSelectColsFromChildren(SelectOperator op, List<String> colList) {
+ public List<String> getSelectColsFromChildren(SelectOperator op,
+ List<String> colList) {
List<String> cols = new ArrayList<String>();
selectDesc conf = op.getConf();
-
- if(conf.isSelStarNoCompute()){
+
+ if (conf.isSelStarNoCompute()) {
cols.addAll(colList);
return cols;
}
-
+
ArrayList<exprNodeDesc> selectExprs = conf.getColList();
-
- // The colList is the output columns used by child operators, they are different
- // from input columns of the current operator. we need to find out which input columns are used.
+
+ // The colList is the output columns used by child operators, they are
+ // different
+ // from input columns of the current operator. we need to find out which
+ // input columns are used.
ArrayList<String> outputColumnNames = conf.getOutputColumnNames();
- for(int i=0;i<outputColumnNames.size();i++){
- if(colList.contains(outputColumnNames.get(i))){
+ for (int i = 0; i < outputColumnNames.size(); i++) {
+ if (colList.contains(outputColumnNames.get(i))) {
exprNodeDesc expr = selectExprs.get(i);
cols = Utilities.mergeUniqElems(cols, expr.getCols());
}
}
-
+
return cols;
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Thu Jan 21 10:37:58 2010
@@ -60,7 +60,6 @@
import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.selectDesc;
import org.apache.hadoop.hive.ql.plan.tableDesc;
-import org.apache.hadoop.hive.ql.plan.tableScanDesc;
/**
* Factory for generating the different node processors used by ColumnPruner.
@@ -70,45 +69,51 @@
/**
* Node Processor for Column Pruning on Filter Operators.
*/
- public static class ColumnPrunerFilterProc implements NodeProcessor {
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
- FilterOperator op = (FilterOperator)nd;
- ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
+ public static class ColumnPrunerFilterProc implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ FilterOperator op = (FilterOperator) nd;
+ ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
exprNodeDesc condn = op.getConf().getPredicate();
// get list of columns used in the filter
List<String> cl = condn.getCols();
// merge it with the downstream col list
- cppCtx.getPrunedColLists().put(op, Utilities.mergeUniqElems(cppCtx.genColLists(op), cl));
+ cppCtx.getPrunedColLists().put(op,
+ Utilities.mergeUniqElems(cppCtx.genColLists(op), cl));
return null;
}
}
-
+
/**
* Factory method to get the ColumnPrunerFilterProc class.
+ *
* @return ColumnPrunerFilterProc
*/
public static ColumnPrunerFilterProc getFilterProc() {
return new ColumnPrunerFilterProc();
}
-
+
/**
* Node Processor for Column Pruning on Group By Operators.
*/
public static class ColumnPrunerGroupByProc implements NodeProcessor {
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
- GroupByOperator op = (GroupByOperator)nd;
- ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ GroupByOperator op = (GroupByOperator) nd;
+ ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
List<String> colLists = new ArrayList<String>();
groupByDesc conf = op.getConf();
ArrayList<exprNodeDesc> keys = conf.getKeys();
- for (exprNodeDesc key : keys)
+ for (exprNodeDesc key : keys) {
colLists = Utilities.mergeUniqElems(colLists, key.getCols());
+ }
ArrayList<aggregationDesc> aggrs = conf.getAggregators();
- for (aggregationDesc aggr : aggrs) {
+ for (aggregationDesc aggr : aggrs) {
ArrayList<exprNodeDesc> params = aggr.getParameters();
- for (exprNodeDesc param : params)
+ for (exprNodeDesc param : params) {
colLists = Utilities.mergeUniqElems(colLists, param.getCols());
+ }
}
cppCtx.getPrunedColLists().put(op, colLists);
@@ -118,6 +123,7 @@
/**
* Factory method to get the ColumnPrunerGroupByProc class.
+ *
* @return ColumnPrunerGroupByProc
*/
public static ColumnPrunerGroupByProc getGroupByProc() {
@@ -128,17 +134,19 @@
* The Default Node Processor for Column Pruning.
*/
public static class ColumnPrunerDefaultProc implements NodeProcessor {
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
- ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
- cppCtx.getPrunedColLists().put((Operator<? extends Serializable>)nd,
- cppCtx.genColLists((Operator<? extends Serializable>)nd));
-
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
+ cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
+ cppCtx.genColLists((Operator<? extends Serializable>) nd));
+
return null;
}
}
/**
* Factory method to get the ColumnPrunerDefaultProc class.
+ *
* @return ColumnPrunerDefaultProc
*/
public static ColumnPrunerDefaultProc getDefaultProc() {
@@ -146,15 +154,18 @@
}
/**
- * The Node Processor for Column Pruning on Table Scan Operators. It will store
- * needed columns in tableScanDesc.
+ * The Node Processor for Column Pruning on Table Scan Operators. It will
+ * store needed columns in tableScanDesc.
*/
public static class ColumnPrunerTableScanProc implements NodeProcessor {
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
- TableScanOperator scanOp = (TableScanOperator)nd;
- ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
- List<String> cols = cppCtx.genColLists((Operator<? extends Serializable>)nd);
- cppCtx.getPrunedColLists().put((Operator<? extends Serializable>)nd, cols);
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ TableScanOperator scanOp = (TableScanOperator) nd;
+ ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
+ List<String> cols = cppCtx
+ .genColLists((Operator<? extends Serializable>) nd);
+ cppCtx.getPrunedColLists().put((Operator<? extends Serializable>) nd,
+ cols);
ArrayList<Integer> needed_columns = new ArrayList<Integer>();
RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRR();
for (int i = 0; i < cols.size(); i++) {
@@ -168,42 +179,50 @@
/**
* Factory method to get the ColumnPrunerDefaultProc class.
+ *
* @return ColumnPrunerTableScanProc
*/
public static ColumnPrunerTableScanProc getTableScanProc() {
return new ColumnPrunerTableScanProc();
}
-
+
/**
* The Node Processor for Column Pruning on Reduce Sink Operators.
*/
public static class ColumnPrunerReduceSinkProc implements NodeProcessor {
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
- ReduceSinkOperator op = (ReduceSinkOperator)nd;
- ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
- HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap =
- cppCtx.getOpToParseCtxMap();
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ ReduceSinkOperator op = (ReduceSinkOperator) nd;
+ ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
+ HashMap<Operator<? extends Serializable>, OpParseContext> opToParseCtxMap = cppCtx
+ .getOpToParseCtxMap();
RowResolver redSinkRR = opToParseCtxMap.get(op).getRR();
reduceSinkDesc conf = op.getConf();
- List<Operator<? extends Serializable>> childOperators = op.getChildOperators();
- List<Operator<? extends Serializable>> parentOperators = op.getParentOperators();
+ List<Operator<? extends Serializable>> childOperators = op
+ .getChildOperators();
+ List<Operator<? extends Serializable>> parentOperators = op
+ .getParentOperators();
List<String> colLists = new ArrayList<String>();
ArrayList<exprNodeDesc> keys = conf.getKeyCols();
- for (exprNodeDesc key : keys)
+ for (exprNodeDesc key : keys) {
colLists = Utilities.mergeUniqElems(colLists, key.getCols());
+ }
- if ((childOperators.size() == 1) && (childOperators.get(0) instanceof JoinOperator)) {
+ if ((childOperators.size() == 1)
+ && (childOperators.get(0) instanceof JoinOperator)) {
assert parentOperators.size() == 1;
Operator<? extends Serializable> par = parentOperators.get(0);
- JoinOperator childJoin = (JoinOperator)childOperators.get(0);
+ JoinOperator childJoin = (JoinOperator) childOperators.get(0);
RowResolver parRR = opToParseCtxMap.get(par).getRR();
- List<String> childJoinCols = cppCtx.getJoinPrunedColLists().get(childJoin).get((byte)conf.getTag());
+ List<String> childJoinCols = cppCtx.getJoinPrunedColLists().get(
+ childJoin).get((byte) conf.getTag());
boolean[] flags = new boolean[conf.getValueCols().size()];
- for (int i = 0; i < flags.length; i++)
+ for (int i = 0; i < flags.length; i++) {
flags[i] = false;
+ }
if (childJoinCols != null && childJoinCols.size() > 0) {
- Map<String,exprNodeDesc> exprMap = op.getColumnExprMap();
+ Map<String, exprNodeDesc> exprMap = op.getColumnExprMap();
for (String childCol : childJoinCols) {
exprNodeDesc desc = exprMap.get(childCol);
int index = conf.getValueCols().indexOf(desc);
@@ -211,19 +230,21 @@
String[] nm = redSinkRR.reverseLookup(childCol);
if (nm != null) {
ColumnInfo cInfo = parRR.get(nm[0], nm[1]);
- if (!colLists.contains(cInfo.getInternalName()))
+ if (!colLists.contains(cInfo.getInternalName())) {
colLists.add(cInfo.getInternalName());
+ }
}
}
}
Collections.sort(colLists);
pruneReduceSinkOperator(flags, op, cppCtx);
- }
- else {
- // Reduce Sink contains the columns needed - no need to aggregate from children
+ } else {
+ // Reduce Sink contains the columns needed - no need to aggregate from
+ // children
ArrayList<exprNodeDesc> vals = conf.getValueCols();
- for (exprNodeDesc val : vals)
+ for (exprNodeDesc val : vals) {
colLists = Utilities.mergeUniqElems(colLists, val.getCols());
+ }
}
cppCtx.getPrunedColLists().put(op, colLists);
@@ -233,6 +254,7 @@
/**
* The Factory method to get ColumnPrunerReduceSinkProc class.
+ *
* @return ColumnPrunerReduceSinkProc
*/
public static ColumnPrunerReduceSinkProc getReduceSinkProc() {
@@ -243,20 +265,25 @@
* The Node Processor for Column Pruning on Select Operators.
*/
public static class ColumnPrunerSelectProc implements NodeProcessor {
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx, Object... nodeOutputs) throws SemanticException {
- SelectOperator op = (SelectOperator)nd;
- ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx)ctx;
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
+ Object... nodeOutputs) throws SemanticException {
+ SelectOperator op = (SelectOperator) nd;
+ ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
List<String> cols = new ArrayList<String>();
- if(op.getChildOperators() != null) {
- for(Operator<? extends Serializable> child: op.getChildOperators()) {
+ if (op.getChildOperators() != null) {
+ for (Operator<? extends Serializable> child : op.getChildOperators()) {
// If one of my children is a FileSink or Script, return all columns.
- // Without this break, a bug in ReduceSink to Extract edge column pruning will manifest
+ // Without this break, a bug in ReduceSink to Extract edge column
+ // pruning will manifest
// which should be fixed before remove this
if ((child instanceof FileSinkOperator)
- || (child instanceof ScriptOperator) || (child instanceof UDTFOperator)
- || (child instanceof LimitOperator) || (child instanceof UnionOperator)) {
- cppCtx.getPrunedColLists().put(op, cppCtx.getColsFromSelectExpr(op));
+ || (child instanceof ScriptOperator)
+ || (child instanceof UDTFOperator)
+ || (child instanceof LimitOperator)
+ || (child instanceof UnionOperator)) {
+ cppCtx.getPrunedColLists()
+ .put(op, cppCtx.getColsFromSelectExpr(op));
return null;
}
}
@@ -264,18 +291,21 @@
cols = cppCtx.genColLists(op);
selectDesc conf = op.getConf();
- // The input to the select does not matter. Go over the expressions
+ // The input to the select does not matter. Go over the expressions
// and return the ones which have a marked column
- cppCtx.getPrunedColLists().put(op, cppCtx.getSelectColsFromChildren(op, cols));
-
- if(conf.isSelStarNoCompute())
+ cppCtx.getPrunedColLists().put(op,
+ cppCtx.getSelectColsFromChildren(op, cols));
+
+ if (conf.isSelStarNoCompute()) {
return null;
-
+ }
+
// do we need to prune the select operator?
List<exprNodeDesc> originalColList = op.getConf().getColList();
List<String> columns = new ArrayList<String>();
- for (exprNodeDesc expr : originalColList)
+ for (exprNodeDesc expr : originalColList) {
Utilities.mergeUniqElems(columns, expr.getCols());
+ }
// by now, 'prunedCols' are columns used by child operators, and 'columns'
// are columns used by this select operator.
ArrayList<String> originalOutputColumnNames = conf.getOutputColumnNames();
@@ -286,7 +316,7 @@
Vector<ColumnInfo> rs_newsignature = new Vector<ColumnInfo>();
RowResolver old_rr = cppCtx.getOpToParseCtxMap().get(op).getRR();
RowResolver new_rr = new RowResolver();
- for(String col : cols){
+ for (String col : cols) {
int index = originalOutputColumnNames.indexOf(col);
newOutputColumnNames.add(col);
newColList.add(originalColList.get(index));
@@ -312,31 +342,37 @@
*
* @param op
* @param retainedSelOutputCols
- * @throws SemanticException
+ * @throws SemanticException
*/
private void handleChildren(SelectOperator op,
- List<String> retainedSelOutputCols, ColumnPrunerProcCtx cppCtx) throws SemanticException {
- for(Operator<? extends Serializable> child: op.getChildOperators()) {
+ List<String> retainedSelOutputCols, ColumnPrunerProcCtx cppCtx)
+ throws SemanticException {
+ for (Operator<? extends Serializable> child : op.getChildOperators()) {
if (child instanceof ReduceSinkOperator) {
- boolean[] flags = getPruneReduceSinkOpRetainFlags(retainedSelOutputCols, (ReduceSinkOperator)child);
- pruneReduceSinkOperator(flags, (ReduceSinkOperator)child, cppCtx);
- }else if (child instanceof FilterOperator){
- //filter operator has the same output columns as its parent
- for(Operator<? extends Serializable> filterChild: child.getChildOperators()){
+ boolean[] flags = getPruneReduceSinkOpRetainFlags(
+ retainedSelOutputCols, (ReduceSinkOperator) child);
+ pruneReduceSinkOperator(flags, (ReduceSinkOperator) child, cppCtx);
+ } else if (child instanceof FilterOperator) {
+ // filter operator has the same output columns as its parent
+ for (Operator<? extends Serializable> filterChild : child
+ .getChildOperators()) {
if (filterChild instanceof ReduceSinkOperator) {
- boolean[] flags = getPruneReduceSinkOpRetainFlags(retainedSelOutputCols, (ReduceSinkOperator)filterChild);
- pruneReduceSinkOperator(flags, (ReduceSinkOperator)filterChild, cppCtx);
+ boolean[] flags = getPruneReduceSinkOpRetainFlags(
+ retainedSelOutputCols, (ReduceSinkOperator) filterChild);
+ pruneReduceSinkOperator(flags, (ReduceSinkOperator) filterChild,
+ cppCtx);
}
}
}
}
}
}
-
+
private static boolean[] getPruneReduceSinkOpRetainFlags(
List<String> retainedParentOpOutputCols, ReduceSinkOperator reduce) {
reduceSinkDesc reduceConf = reduce.getConf();
- java.util.ArrayList<exprNodeDesc> originalValueEval = reduceConf.getValueCols();
+ java.util.ArrayList<exprNodeDesc> originalValueEval = reduceConf
+ .getValueCols();
boolean[] flags = new boolean[originalValueEval.size()];
for (int i = 0; i < originalValueEval.size(); i++) {
flags[i] = false;
@@ -354,9 +390,10 @@
}
return flags;
}
-
+
private static void pruneReduceSinkOperator(boolean[] retainFlags,
- ReduceSinkOperator reduce, ColumnPrunerProcCtx cppCtx) throws SemanticException {
+ ReduceSinkOperator reduce, ColumnPrunerProcCtx cppCtx)
+ throws SemanticException {
reduceSinkDesc reduceConf = reduce.getConf();
Map<String, exprNodeDesc> oldMap = reduce.getColumnExprMap();
Map<String, exprNodeDesc> newMap = new HashMap<String, exprNodeDesc>();
@@ -385,10 +422,11 @@
sig.add(colInfo);
}
}
-
+
ArrayList<exprNodeDesc> keyCols = reduceConf.getKeyCols();
List<String> keys = new ArrayList<String>();
- RowResolver parResover = cppCtx.getOpToParseCtxMap().get(reduce.getParentOperators().get(0)).getRR();
+ RowResolver parResover = cppCtx.getOpToParseCtxMap().get(
+ reduce.getParentOperators().get(0)).getRR();
for (int i = 0; i < keyCols.size(); i++) {
keys = Utilities.mergeUniqElems(keys, keyCols.get(i).getCols());
}
@@ -396,29 +434,31 @@
String outputCol = keys.get(i);
String[] nm = parResover.reverseLookup(outputCol);
ColumnInfo colInfo = oldRR.get(nm[0], nm[1]);
- if (colInfo != null)
+ if (colInfo != null) {
newRR.put(nm[0], nm[1], colInfo);
+ }
}
-
+
cppCtx.getOpToParseCtxMap().get(reduce).setRR(newRR);
reduce.setColumnExprMap(newMap);
reduce.getSchema().setSignature(sig);
reduceConf.setOutputValueColumnNames(newOutputColNames);
reduceConf.setValueCols(newValueEval);
- tableDesc newValueTable = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
- reduceConf.getValueCols(), newOutputColNames, 0, ""));
+ tableDesc newValueTable = PlanUtils.getReduceValueTableDesc(PlanUtils
+ .getFieldSchemasFromColumnList(reduceConf.getValueCols(),
+ newOutputColNames, 0, ""));
reduceConf.setValueSerializeInfo(newValueTable);
}
-
/**
* The Factory method to get the ColumnPrunerSelectProc class.
+ *
* @return ColumnPrunerSelectProc
*/
public static ColumnPrunerSelectProc getSelectProc() {
return new ColumnPrunerSelectProc();
}
-
+
/**
* The Node Processor for Column Pruning on Join Operators.
*/
@@ -426,7 +466,8 @@
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs) throws SemanticException {
JoinOperator op = (JoinOperator) nd;
- pruneJoinOperator(ctx, op, op.getConf(), op.getColumnExprMap(), null, false);
+ pruneJoinOperator(ctx, op, op.getConf(), op.getColumnExprMap(), null,
+ false);
return null;
}
}
@@ -439,7 +480,7 @@
public static ColumnPrunerJoinProc getJoinProc() {
return new ColumnPrunerJoinProc();
}
-
+
/**
* The Node Processor for Column Pruning on Map Join Operators.
*/
@@ -447,27 +488,30 @@
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx ctx,
Object... nodeOutputs) throws SemanticException {
MapJoinOperator op = (MapJoinOperator) nd;
- pruneJoinOperator(ctx, op, op.getConf(), op.getColumnExprMap(), op.getConf().getRetainList(), true);
+ pruneJoinOperator(ctx, op, op.getConf(), op.getColumnExprMap(), op
+ .getConf().getRetainList(), true);
return null;
}
}
-
+
private static void pruneJoinOperator(NodeProcessorCtx ctx,
CommonJoinOperator op, joinDesc conf,
Map<String, exprNodeDesc> columnExprMap,
- Map<Byte, List<Integer>> retainMap, boolean mapJoin) throws SemanticException {
+ Map<Byte, List<Integer>> retainMap, boolean mapJoin)
+ throws SemanticException {
ColumnPrunerProcCtx cppCtx = (ColumnPrunerProcCtx) ctx;
Map<Byte, List<String>> prunedColLists = new HashMap<Byte, List<String>>();
List<Operator<? extends Serializable>> childOperators = op
.getChildOperators();
for (Operator<? extends Serializable> child : childOperators) {
- if (child instanceof FileSinkOperator)
+ if (child instanceof FileSinkOperator) {
return;
+ }
}
- List<String> childColLists = cppCtx.genColLists((Operator<? extends Serializable>)op);
-
+ List<String> childColLists = cppCtx.genColLists(op);
+
RowResolver joinRR = cppCtx.getOpToParseCtxMap().get(op).getRR();
RowResolver newJoinRR = new RowResolver();
ArrayList<String> outputCols = new ArrayList<String>();
@@ -480,11 +524,13 @@
Byte tag = conf.getReversedExprs().get(internalName);
if (!childColLists.contains(internalName)) {
int index = conf.getExprs().get(tag).indexOf(desc);
- if (index < 0)
+ if (index < 0) {
continue;
+ }
conf.getExprs().get(tag).remove(desc);
- if (retainMap != null)
+ if (retainMap != null) {
retainMap.get(tag).remove(index);
+ }
} else {
List<String> prunedRSList = prunedColLists.get(tag);
if (prunedRSList == null) {
@@ -496,7 +542,7 @@
newColExprMap.put(internalName, desc);
}
}
-
+
if (mapJoin) {
// regenerate the valueTableDesc
List<tableDesc> valueTableDescs = new ArrayList<tableDesc>();
@@ -508,9 +554,8 @@
keyOrder.append("+");
}
- tableDesc valueTableDesc = PlanUtils
- .getMapJoinValueTableDesc(PlanUtils
- .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
+ tableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
+ .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
valueTableDescs.add(valueTableDesc);
}
@@ -564,5 +609,5 @@
public static ColumnPrunerMapJoinProc getMapJoinProc() {
return new ColumnPrunerMapJoinProc();
}
-
+
}