You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zs...@apache.org on 2008/09/20 01:56:35 UTC
svn commit: r697291 [11/31] - in /hadoop/core/trunk: ./
src/contrib/hive/cli/src/java/org/apache/hadoop/hive/cli/
src/contrib/hive/metastore/if/
src/contrib/hive/metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/
src/contrib/hive/metastor...
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Fri Sep 19 16:56:30 2008
@@ -22,29 +22,43 @@
import java.util.*;
import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.WritableHiveObject;
-import org.apache.hadoop.hive.ql.io.WritableComparableHiveObject;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.exec.ExecMapper.reportStats;
+import org.apache.hadoop.hive.serde2.ColumnSet;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.MetadataListStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
public class ExecReducer extends MapReduceBase implements Reducer {
private JobConf jc;
- private OutputCollector oc;
- private Operator reducer;
+ private OutputCollector<?,?> oc;
+ private Operator<?> reducer;
private Reporter rp;
private boolean abort = false;
private boolean isTagged = false;
- private final HiveObject [] tagObjects = new HiveObject [Byte.MAX_VALUE];
private static String [] fieldNames;
public static final Log l4j = LogFactory.getLog("ExecReducer");
+ // TODO: move to DynamicSerDe when it's ready
+ private Deserializer inputKeyDeserializer;
+ // Input value serde needs to be an array to support different SerDe
+ // for different tags
+ private SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
static {
ArrayList<String> fieldNameArray = new ArrayList<String> ();
for(Utilities.ReduceField r: Utilities.ReduceField.values()) {
@@ -53,20 +67,37 @@
fieldNames = fieldNameArray.toArray(new String [0]);
}
-
public void configure(JobConf job) {
jc = job;
mapredWork gWork = Utilities.getMapRedWork(job);
reducer = gWork.getReducer();
reducer.setMapredWork(gWork);
isTagged = gWork.getNeedsTagging();
-
- // create a hive object to encapsulate each one of the potential tags
- for(int i=0; i<Byte.MAX_VALUE; i++) {
- tagObjects[i] = new PrimitiveHiveObject(Byte.valueOf((byte)i));
- }
+ try {
+ // We should initialize the SerDe with the TypeInfo when available.
+ tableDesc keyTableDesc = PlanUtils.getReduceKeyDesc(gWork);
+ inputKeyDeserializer = (SerDe)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+ inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
+ for(int tag=0; tag<Byte.MAX_VALUE; tag++) {
+ // We should initialize the SerDe with the TypeInfo when available.
+ tableDesc valueTableDesc = PlanUtils.getReduceValueDesc(gWork, tag);
+ inputValueDeserializer[tag] = (SerDe)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+ inputValueDeserializer[tag].initialize(null, valueTableDesc.getProperties());
+ }
+ } catch (SerDeException e) {
+ throw new RuntimeException(e);
+ }
}
+ private Object keyObject;
+ private ObjectInspector keyObjectInspector;
+ private Object[] valueObject = new Object[Byte.MAX_VALUE];
+ private ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+ private ObjectInspector[] rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+
+ private BytesWritable groupKey;
+
+ ArrayList<Object> row = new ArrayList<Object>(3);
public void reduce(Object key, Iterator values,
OutputCollector output,
Reporter reporter) throws IOException {
@@ -85,26 +116,65 @@
}
try {
- // the key is either a WritableComparable or a NoTagWritableComparable
- HiveObject keyObject = ((WritableComparableHiveObject)key).getHo();
- //System.err.print(keyObject.toString());
- // If a operator wants to do some work at the beginning of a group
- reducer.startGroup();
- while(values.hasNext()) {
- WritableHiveObject who = (WritableHiveObject)values.next();
- //System.err.print(who.getHo().toString());
-
- LabeledCompositeHiveObject lho = new LabeledCompositeHiveObject(fieldNames);
- lho.addHiveObject(keyObject);
- lho.addHiveObject(who.getHo());
- if(isTagged) {
- lho.addHiveObject(tagObjects[who.getTag()]);
+ BytesWritable keyWritable = (BytesWritable)key;
+ byte tag = 0;
+ if (isTagged) {
+ // remove the tag
+ int size = keyWritable.getSize() - 1;
+ tag = keyWritable.get()[size];
+ keyWritable.setSize(size);
+ }
+
+ if (!keyWritable.equals(groupKey)) {
+ // If a operator wants to do some work at the beginning of a group
+ if (groupKey == null) {
+ groupKey = new BytesWritable();
+ } else {
+ // If a operator wants to do some work at the end of a group
+ l4j.trace("End Group");
+ reducer.endGroup();
}
- reducer.process(lho);
+ groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
+ l4j.trace("Start Group");
+ reducer.startGroup();
+ }
+ try {
+ keyObject = inputKeyDeserializer.deserialize(keyWritable);
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ }
+ // This is a hack for generating the correct ObjectInspector.
+ // In the future, we should use DynamicSerde and initialize it using the type info.
+ if (keyObjectInspector == null) {
+ // Directly create ObjectInspector here because we didn't know the number of cols till now.
+ keyObjectInspector = MetadataListStructObjectInspector.getInstance(((ColumnSet)keyObject).col.size());
+ }
+ // System.err.print(keyObject.toString());
+ while (values.hasNext()) {
+ Text valueText = (Text)values.next();
+ //System.err.print(who.getHo().toString());
+ try {
+ valueObject[tag] = inputValueDeserializer[tag].deserialize(valueText);
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ }
+ row.clear();
+ row.add(keyObject);
+ row.add(valueObject[tag]);
+ row.add(tag);
+ if (valueObjectInspector[tag] == null) {
+ // Directly create ObjectInspector here because we didn't know the number of cols till now.
+ valueObjectInspector[tag] = MetadataListStructObjectInspector.getInstance(((ColumnSet)valueObject[tag]).col.size());
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector[tag]);
+ ois.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(Byte.class));
+ rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
+ Arrays.asList(fieldNames), ois);
+ }
+ reducer.process(row, rowObjectInspector[tag]);
}
- // If a operator wants to do some work at the end of a group
- reducer.endGroup();
} catch (HiveException e) {
abort = true;
@@ -114,6 +184,11 @@
public void close() {
try {
+ if (groupKey != null) {
+ // If a operator wants to do some work at the end of a group
+ l4j.trace("End Group");
+ reducer.endGroup();
+ }
reducer.close(abort);
reportStats rps = new reportStats (rp);
reducer.preorderMap(rps);
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Fri Sep 19 16:56:30 2008
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.FileOutputStream;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.lang.annotation.Annotation;
@@ -28,6 +29,7 @@
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.ql.plan.explain;
import org.apache.hadoop.hive.ql.plan.explainWork;
import org.apache.hadoop.util.StringUtils;
@@ -43,9 +45,9 @@
public int execute() {
try {
- // If this is an explain plan then return from here
- PrintStream out = new PrintStream(new FileOutputStream(work.getResFile()));
-
+ OutputStream outS = FileSystem.get(conf).create(work.getResFile());
+ PrintStream out = new PrintStream(outS);
+
// Print out the parse AST
outputAST(work.getAstStringTree(), out, 0);
out.println();
@@ -55,7 +57,8 @@
// Go over all the tasks and dump out the plans
outputStagePlans(out, work.getRootTasks(), 0);
-
+ out.close();
+
return (0);
}
catch (Exception e) {
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java Fri Sep 19 16:56:30 2008
@@ -20,25 +20,59 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
-import org.apache.hadoop.hive.serde.SerDeField;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+/**
+ * This class support multi-level fields like "a.b.c" for historical reasons.
+ */
public class ExprNodeColumnEvaluator extends ExprNodeEvaluator {
protected exprNodeColumnDesc expr;
- transient SerDeField field;
+ transient StructObjectInspector cachedRowInspector;
+ transient String[] fieldNames;
+ transient StructField[] fields;
+ transient ObjectInspector[] fieldsObjectInspector;
public ExprNodeColumnEvaluator(exprNodeColumnDesc expr) {
this.expr = expr;
}
- public Object evaluateToObject(HiveObject row) throws HiveException {
- return evaluate(row).getJavaObject();
+ public void evaluate(Object row, ObjectInspector rowInspector,
+ InspectableObject result) throws HiveException {
+
+ assert(result != null);
+ // If this is the first row, or the dynamic structure of this row
+ // is different from the previous row
+ if (fields == null || cachedRowInspector != rowInspector) {
+ evaluateInspector(rowInspector);
+ }
+ result.o = cachedRowInspector.getStructFieldData(row, fields[0]);
+ for(int i=1; i<fields.length; i++) {
+ result.o = ((StructObjectInspector)fieldsObjectInspector[i-1]).getStructFieldData(
+ result.o, fields[i]);
+ }
+ result.oi = fieldsObjectInspector[fieldsObjectInspector.length - 1];
}
- public HiveObject evaluate(HiveObject row) throws HiveException {
- if (field == null) {
- field = row.getFieldFromExpression(expr.getColumn());
+ public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+ throws HiveException {
+
+ if (fields == null || cachedRowInspector != rowInspector) {
+ cachedRowInspector = (StructObjectInspector)rowInspector;
+ fieldNames = expr.getColumn().split("\\.", -1);
+ fields = new StructField[fieldNames.length];
+ fieldsObjectInspector = new ObjectInspector[fieldNames.length];
+ fields[0] = cachedRowInspector.getStructFieldRef(fieldNames[0]);
+ fieldsObjectInspector[0] = fields[0].getFieldObjectInspector();
+ for (int i=1; i<fieldNames.length; i++) {
+ fields[i] = ((StructObjectInspector)fieldsObjectInspector[i-1]).getStructFieldRef(fieldNames[i]);
+ fieldsObjectInspector[i] = fields[i].getFieldObjectInspector();
+ }
}
- return row.get(field);
+ return fieldsObjectInspector[fieldsObjectInspector.length - 1];
}
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java Fri Sep 19 16:56:30 2008
@@ -20,20 +20,30 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.exprNodeConstantDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
public class ExprNodeConstantEvaluator extends ExprNodeEvaluator {
protected exprNodeConstantDesc expr;
-
+ transient ObjectInspector objectInspector;
+
public ExprNodeConstantEvaluator(exprNodeConstantDesc expr) {
this.expr = expr;
+ objectInspector = ObjectInspectorFactory.getStandardPrimitiveObjectInspector(expr.getTypeInfo().getPrimitiveClass());
}
- public Object evaluateToObject(HiveObject row) throws HiveException {
- return expr.getValue();
+ public void evaluate(Object row, ObjectInspector rowInspector,
+ InspectableObject result) throws HiveException {
+ assert(result != null);
+ result.o = expr.getValue();
+ result.oi = objectInspector;
}
- public HiveObject evaluate(HiveObject r) throws HiveException {
- return new PrimitiveHiveObject(evaluateToObject(r));
+ public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+ throws HiveException {
+ return objectInspector;
}
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java Fri Sep 19 16:56:30 2008
@@ -19,16 +19,21 @@
package org.apache.hadoop.hive.ql.exec;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
public abstract class ExprNodeEvaluator {
+
/**
- * @return plain old java object
- **/
- public abstract Object evaluateToObject(HiveObject row) throws HiveException;
+ * Evaluate the expression given the row and rowInspector.
+ * @param result result.o and result.oi will be set inside the method.
+ */
+ public abstract void evaluate(Object row, ObjectInspector rowInspector, InspectableObject result) throws HiveException;
/**
- * @return encapsulated Hive Object
- **/
- public abstract HiveObject evaluate(HiveObject row) throws HiveException;
-
+ * Metadata evaluation. Return the inspector for the expression, given the rowInspector.
+ * This method must return the same value as result.oi in evaluate(...) call with the same rowInspector.
+ */
+ public abstract ObjectInspector evaluateInspector(ObjectInspector rowInspector) throws HiveException;
+
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java Fri Sep 19 16:56:30 2008
@@ -20,29 +20,59 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.exprNodeFieldDesc;
-import org.apache.hadoop.hive.serde.SerDeField;
+
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
public class ExprNodeFieldEvaluator extends ExprNodeEvaluator {
protected exprNodeFieldDesc desc;
- transient ExprNodeEvaluator evaluator;
- transient SerDeField field;
+ transient ExprNodeEvaluator leftEvaluator;
+ transient InspectableObject leftInspectableObject;
+ transient StructObjectInspector cachedLeftObjectInspector;
+ transient StructField field;
+ transient ObjectInspector fieldObjectInspector;
public ExprNodeFieldEvaluator(exprNodeFieldDesc desc) {
this.desc = desc;
- evaluator = ExprNodeEvaluatorFactory.get(desc.getDesc());
+ leftEvaluator = ExprNodeEvaluatorFactory.get(desc.getDesc());
+ field = null;
+ leftInspectableObject = new InspectableObject();
}
+ public void evaluate(Object row, ObjectInspector rowInspector,
+ InspectableObject result) throws HiveException {
+
+ assert(result != null);
+ // Get the result in leftInspectableObject
+ leftEvaluator.evaluate(row, rowInspector, leftInspectableObject);
- public Object evaluateToObject(HiveObject row) throws HiveException {
- return evaluate(row).getJavaObject();
+ if (field == null) {
+ cachedLeftObjectInspector = (StructObjectInspector)leftInspectableObject.oi;
+ field = cachedLeftObjectInspector.getStructFieldRef(desc.getFieldName());
+ fieldObjectInspector = field.getFieldObjectInspector();
+ } else {
+ assert(cachedLeftObjectInspector == leftInspectableObject.oi);
+ }
+ result.oi = fieldObjectInspector;
+ result.o = cachedLeftObjectInspector.getStructFieldData(leftInspectableObject.o, field);
}
- public HiveObject evaluate(HiveObject row) throws HiveException {
- HiveObject ho = evaluator.evaluate(row);
+ public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+ throws HiveException {
+ // If this is the first row, or the dynamic structure of the evaluatorInspectableObject
+ // is different from the previous row
+ leftInspectableObject.oi = leftEvaluator.evaluateInspector(rowInspector);
if (field == null) {
- field = ho.getFieldFromExpression(desc.getFieldName());
+ cachedLeftObjectInspector = (StructObjectInspector)leftInspectableObject.oi;
+ field = cachedLeftObjectInspector.getStructFieldRef(desc.getFieldName());
+ fieldObjectInspector = field.getFieldObjectInspector();
+ } else {
+ assert(cachedLeftObjectInspector == leftInspectableObject.oi);
}
- return ho.get(field);
+ return fieldObjectInspector;
}
+
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java Fri Sep 19 16:56:30 2008
@@ -18,13 +18,16 @@
package org.apache.hadoop.hive.ql.exec;
-import java.util.ArrayList;
+import java.lang.reflect.Method;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.exprNodeFuncDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.util.ReflectionUtils;
public class ExprNodeFuncEvaluator extends ExprNodeEvaluator {
@@ -32,41 +35,60 @@
private static final Log LOG = LogFactory.getLog(ExprNodeFuncEvaluator.class.getName());
protected exprNodeFuncDesc expr;
- transient ArrayList<ExprNodeEvaluator> evaluators;
- transient Object[] children;
+ transient ExprNodeEvaluator[] paramEvaluators;
+ transient InspectableObject[] paramInspectableObjects;
+ transient Object[] paramValues;
transient UDF udf;
+ transient Method udfMethod;
+ transient ObjectInspector outputObjectInspector;
public ExprNodeFuncEvaluator(exprNodeFuncDesc expr) {
this.expr = expr;
assert(expr != null);
Class<?> c = expr.getUDFClass();
- LOG.info(c.toString());
+ udfMethod = expr.getUDFMethod();
+ LOG.debug(c.toString());
+ LOG.debug(udfMethod.toString());
udf = (UDF)ReflectionUtils.newInstance(expr.getUDFClass(), null);
- evaluators = new ArrayList<ExprNodeEvaluator>();
- for(int i=0; i<expr.getChildren().size(); i++) {
- evaluators.add(ExprNodeEvaluatorFactory.get(expr.getChildren().get(i)));
+ int paramNumber = expr.getChildren().size();
+ paramEvaluators = new ExprNodeEvaluator[paramNumber];
+ paramInspectableObjects = new InspectableObject[paramNumber];
+ for(int i=0; i<paramNumber; i++) {
+ paramEvaluators[i] = ExprNodeEvaluatorFactory.get(expr.getChildren().get(i));
+ paramInspectableObjects[i] = new InspectableObject();
}
- children = new Object[expr.getChildren().size()];
+ paramValues = new Object[expr.getChildren().size()];
+ outputObjectInspector = ObjectInspectorFactory.getStandardPrimitiveObjectInspector(
+ udfMethod.getReturnType());
}
- public Object evaluateToObject(HiveObject row) throws HiveException {
+ public void evaluate(Object row, ObjectInspector rowInspector,
+ InspectableObject result) throws HiveException {
+ if (result == null) {
+ throw new HiveException("result cannot be null.");
+ }
// Evaluate all children first
- for(int i=0; i<evaluators.size(); i++) {
- Object o = evaluators.get(i).evaluateToObject(row);
- children[i] = o;
+ for(int i=0; i<paramEvaluators.length; i++) {
+ paramEvaluators[i].evaluate(row, rowInspector, paramInspectableObjects[i]);
+ paramValues[i] = paramInspectableObjects[i].o;
}
try {
- return expr.getUDFMethod().invoke(udf, children);
+ result.o = udfMethod.invoke(udf, paramValues);
+ result.oi = outputObjectInspector;
} catch (Exception e) {
- throw new HiveException("Unable to execute UDF function " + udf.getClass() + " "
- + expr.getUDFMethod() + " on inputs " + "(" + children.length + ") " + Arrays.asList(children) + ": " + e.getMessage(), e);
+ if (e instanceof HiveException) {
+ throw (HiveException)e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException)e;
+ } else {
+ throw new HiveException("Unable to execute UDF function " + udf.getClass() + " "
+ + udfMethod + " on inputs " + "(" + paramValues.length + ") " + Arrays.asList(paramValues) + ": " + e.getMessage(), e);
+ }
}
}
- public HiveObject evaluate(HiveObject row) throws HiveException {
- Object obj = evaluateToObject(row);
- if (obj == null)
- return new NullHiveObject();
- return new PrimitiveHiveObject(obj);
+ public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+ throws HiveException {
+ return outputObjectInspector;
}
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java Fri Sep 19 16:56:30 2008
@@ -20,14 +20,17 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.exprNodeIndexDesc;
-import org.apache.hadoop.hive.serde.SerDeField;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
public class ExprNodeIndexEvaluator extends ExprNodeEvaluator {
protected exprNodeIndexDesc expr;
transient ExprNodeEvaluator mainEvaluator;
+ transient InspectableObject mainInspectableObject = new InspectableObject();
transient ExprNodeEvaluator indexEvaluator;
- transient SerDeField field;
+ transient InspectableObject indexInspectableObject = new InspectableObject();
public ExprNodeIndexEvaluator(exprNodeIndexDesc expr) {
this.expr = expr;
@@ -35,18 +38,22 @@
indexEvaluator = ExprNodeEvaluatorFactory.get(expr.getIndex());
}
- public Object evaluateToObject(HiveObject row) throws HiveException {
- return evaluate(row).getJavaObject();
+ public void evaluate(Object row, ObjectInspector rowInspector,
+ InspectableObject result) throws HiveException {
+
+ assert(result != null);
+ mainEvaluator.evaluate(row, rowInspector, mainInspectableObject);
+ indexEvaluator.evaluate(row, rowInspector, indexInspectableObject);
+ int index = ((Number)indexInspectableObject.o).intValue();
+
+ ListObjectInspector loi = (ListObjectInspector)mainInspectableObject.oi;
+ result.oi = loi.getListElementObjectInspector();
+ result.o = loi.getListElement(mainInspectableObject.o, index);
}
- public HiveObject evaluate(HiveObject row) throws HiveException {
- HiveObject ho = mainEvaluator.evaluate(row);
- if (field == null || !(indexEvaluator instanceof ExprNodeConstantEvaluator)) {
- // TODO: This optimization is wrong because of the field implementation inside HiveObject.
- // The problem is that at the second "[" (after "c"), "field" caches both "index1" and
- // "index2" in "a.b[index1].c[index2]", while it should only cache "index2".
- field = ho.getFieldFromExpression("[" + indexEvaluator.evaluateToObject(row) + "]");
- }
- return ho.get(field);
+ public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+ throws HiveException {
+ return ((ListObjectInspector)mainEvaluator.evaluateInspector(rowInspector)).getListElementObjectInspector();
}
+
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeNullEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeNullEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeNullEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeNullEvaluator.java Fri Sep 19 16:56:30 2008
@@ -20,6 +20,8 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.exprNodeNullDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
// This function will not be used currently, since the function expressions
// change the void to the first matching argument
@@ -31,11 +33,14 @@
this.expr = expr;
}
- public Object evaluateToObject(HiveObject row) throws HiveException {
- return expr.getValue();
+ public void evaluate(Object row, ObjectInspector rowInspector,
+ InspectableObject result) throws HiveException {
+ throw new HiveException("Hive 2 Internal exception: should not reach here.");
}
- public HiveObject evaluate(HiveObject r) throws HiveException {
- return new NullHiveObject();
+ public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+ throws HiveException {
+ throw new HiveException("Hive 2 Internal exception: should not reach here.");
}
+
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Fri Sep 19 16:56:30 2008
@@ -22,6 +22,8 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.extractDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.conf.Configuration;
/**
@@ -31,13 +33,15 @@
public class ExtractOperator extends Operator<extractDesc> implements Serializable {
private static final long serialVersionUID = 1L;
transient protected ExprNodeEvaluator eval;
+ transient protected InspectableObject result = new InspectableObject();
public void initialize(Configuration hconf) throws HiveException {
super.initialize(hconf);
eval = ExprNodeEvaluatorFactory.get(conf.getCol());
}
- public void process(HiveObject r) throws HiveException {
- forward (eval.evaluate(r));
+ public void process(Object row, ObjectInspector rowInspector) throws HiveException {
+ eval.evaluate(row, rowInspector, result);
+ forward(result.o, result.oi);
}
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Fri Sep 19 16:56:30 2008
@@ -23,16 +23,14 @@
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.*;
-import org.apache.hadoop.util.*;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
-import org.apache.hadoop.hive.serde.SerDe;
-import org.apache.hadoop.hive.serde.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
/**
* File Sink operator implementation
@@ -49,7 +47,7 @@
transient protected FileSystem fs;
transient protected Path outPath;
transient protected Path finalPath;
- transient protected SerDe serDe;
+ transient protected Serializer serializer;
transient protected BytesWritable commonKey = new BytesWritable();
private void commit() throws IOException {
@@ -79,13 +77,31 @@
public void initialize(Configuration hconf) throws HiveException {
super.initialize(hconf);
try {
+ serializer = (Serializer)conf.getTableInfo().getDeserializerClass().newInstance();
+ serializer.initialize(null, conf.getTableInfo().getProperties());
+
+ JobConf jc;
+ if(hconf instanceof JobConf) {
+ jc = (JobConf)hconf;
+ } else {
+ // test code path
+ jc = new JobConf(hconf, ExecDriver.class);
+ }
+
fs = FileSystem.get(hconf);
finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf));
outPath = new Path(conf.getDirName(), "_tmp."+Utilities.getTaskId(hconf));
- OutputFormat outputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+ OutputFormat<?, ?> outputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+ final Class<? extends Writable> outputClass = serializer.getSerializedClass();
+ boolean isCompressed = FileOutputFormat.getCompressOutput(jc);
+ // The reason to keep these instead of using OutputFormat.getRecordWriter() is that
+ // getRecordWriter does not give us enough control over the file name that we create.
if(outputFormat instanceof IgnoreKeyTextOutputFormat) {
- final FSDataOutputStream outStream = fs.create(outPath);
+ if(isCompressed) {
+ finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf) + ".gz");
+ }
+ final OutputStream outStream = Utilities.createCompressedStream(jc, fs.create(outPath));
outWriter = new RecordWriter () {
public void write(Writable r) throws IOException {
Text tr = (Text)r;
@@ -98,7 +114,7 @@
};
} else if (outputFormat instanceof SequenceFileOutputFormat) {
final SequenceFile.Writer outStream =
- SequenceFile.createWriter(fs, hconf, outPath, BytesWritable.class, Text.class);
+ Utilities.createSequenceWriter(jc, fs, outPath, BytesWritable.class, outputClass);
outWriter = new RecordWriter () {
public void write(Writable r) throws IOException {
outStream.append(commonKey, r);
@@ -109,20 +125,22 @@
};
} else {
// should never come here - we should be catching this in ddl command
- assert(false);
+ throw new HiveException ("Illegal outputformat: " + outputFormat.getClass().getName());
}
- serDe = conf.getTableInfo().getSerdeClass().newInstance();
+ } catch (HiveException e) {
+ throw e;
} catch (Exception e) {
e.printStackTrace();
throw new HiveException(e);
}
}
- public void process(HiveObject r) throws HiveException {
+ Writable recordValue;
+ public void process(Object row, ObjectInspector rowInspector) throws HiveException {
try {
// user SerDe to serialize r, and write it out
- Writable value = serDe.serialize(r.getJavaObject());
- outWriter.write(value);
+ recordValue = serializer.serialize(row, rowInspector);
+ outWriter.write(recordValue);
} catch (IOException e) {
throw new HiveException (e);
} catch (SerDeException e) {
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Fri Sep 19 16:56:30 2008
@@ -23,6 +23,8 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.hive.ql.plan.filterDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.conf.Configuration;
/**
@@ -33,18 +35,20 @@
private static final long serialVersionUID = 1L;
public static enum Counter {FILTERED, PASSED}
transient private final LongWritable filtered_count, passed_count;
- transient private ExprNodeEvaluator eval;
-
+ transient private ExprNodeEvaluator conditionEvaluator;
+ transient private InspectableObject conditionInspectableObject;
+
public FilterOperator () {
super();
filtered_count = new LongWritable();
passed_count = new LongWritable();
+ conditionInspectableObject = new InspectableObject();
}
public void initialize(Configuration hconf) throws HiveException {
super.initialize(hconf);
try {
- this.eval = ExprNodeEvaluatorFactory.get(conf.getPredicate());
+ this.conditionEvaluator = ExprNodeEvaluatorFactory.get(conf.getPredicate());
statsMap.put(Counter.FILTERED, filtered_count);
statsMap.put(Counter.PASSED, passed_count);
} catch (Throwable e) {
@@ -53,11 +57,12 @@
}
}
- public void process(HiveObject r) throws HiveException {
+ public void process(Object row, ObjectInspector rowInspector) throws HiveException {
try {
- Boolean ret = (Boolean)(eval.evaluateToObject(r));
+ conditionEvaluator.evaluate(row, rowInspector, conditionInspectableObject);
+ Boolean ret = (Boolean)(conditionInspectableObject.o);
if (Boolean.TRUE.equals(ret)) {
- forward(r);
+ forward(row, rowInspector);
passed_count.set(passed_count.get()+1);
} else {
filtered_count.set(filtered_count.get()+1);
@@ -65,9 +70,7 @@
} catch (ClassCastException e) {
e.printStackTrace();
throw new HiveException("Non Boolean return Object type: " +
- eval.evaluateToObject(r).getClass().getName());
- } catch (NullPointerException e) {
- throw new HiveException("NullPointerException in FilterOperator ", e);
+ conditionInspectableObject.o.getClass().getName());
}
}
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java Fri Sep 19 16:56:30 2008
@@ -22,6 +22,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.forwardDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.conf.Configuration;
/**
@@ -35,7 +36,9 @@
// nothing to do really ..
}
- public void process(HiveObject r) throws HiveException {
- forward(r);
+ @Override
+ public void process(Object row, ObjectInspector rowInspector)
+ throws HiveException {
+ forward(row, rowInspector);
}
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Fri Sep 19 16:56:30 2008
@@ -29,8 +29,8 @@
import java.lang.Void;
import org.apache.hadoop.hive.ql.exec.FunctionInfo.OperatorType;
-import org.apache.hadoop.hive.ql.parse.TypeInfo;
import org.apache.hadoop.hive.ql.udf.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
public class FunctionRegistry {
@@ -252,7 +252,7 @@
for(int i=0; i<argumentClasses.size() && match; i++) {
if (argumentClasses.get(i) == Void.class) continue;
- Class<?> accepted = TypeInfo.generalizePrimitive(argumentTypeInfos[i]);
+ Class<?> accepted = ObjectInspectorUtils.generalizePrimitive(argumentTypeInfos[i]);
if (accepted.isAssignableFrom(argumentClasses.get(i))) {
// do nothing if match
} else if (!exact && implicitConvertable(argumentClasses.get(i), accepted)) {
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Sep 19 16:56:30 2008
@@ -28,7 +28,9 @@
import org.apache.hadoop.hive.ql.plan.aggregationDesc;
import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
import org.apache.hadoop.hive.ql.plan.groupByDesc;
-import org.apache.hadoop.hive.serde.SerDeField;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.conf.Configuration;
/**
@@ -47,15 +49,18 @@
transient protected Method[] aggregationsAggregateMethods;
transient protected Method[] aggregationsEvaluateMethods;
- transient protected List<SerDeField> choKeyFields;
+ transient protected ArrayList<ObjectInspector> objectInspectors;
+ transient protected ObjectInspector outputObjectInspector;
// Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2
- transient protected CompositeHiveObject currentKeys;
+ transient protected ArrayList<Object> currentKeys;
transient protected UDAF[] aggregations;
transient protected Object[][] aggregationsParametersLastInvoke;
// Used by hash-based GroupBy: Mode = HASH
- transient protected HashMap<CompositeHiveObject, UDAF[]> hashAggregations;
+ transient protected HashMap<ArrayList<Object>, UDAF[]> hashAggregations;
+
+ transient boolean firstRow;
public void initialize(Configuration hconf) throws HiveException {
super.initialize(hconf);
@@ -124,8 +129,20 @@
aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
aggregations = newAggregations();
} else {
- hashAggregations = new HashMap<CompositeHiveObject, UDAF[]>();
+ hashAggregations = new HashMap<ArrayList<Object>, UDAF[]>();
+ }
+ // init objectInspectors
+ int totalFields = keyFields.length + aggregationClasses.length;
+ objectInspectors = new ArrayList<ObjectInspector>(totalFields);
+ for(int i=0; i<keyFields.length; i++) {
+ objectInspectors.add(null);
}
+ for(int i=0; i<aggregationClasses.length; i++) {
+ objectInspectors.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(
+ aggregationsEvaluateMethods[i].getReturnType()));
+ }
+
+ firstRow = true;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
@@ -141,12 +158,15 @@
return aggs;
}
- protected void updateAggregations(UDAF[] aggs, HiveObject row, Object[][] lastInvoke) throws Exception {
+ InspectableObject tempInspectableObject = new InspectableObject();
+
+ protected void updateAggregations(UDAF[] aggs, Object row, ObjectInspector rowInspector, Object[][] lastInvoke) throws Exception {
for(int ai=0; ai<aggs.length; ai++) {
// Calculate the parameters
Object[] o = new Object[aggregationParameterFields[ai].length];
for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) {
- o[pi] = aggregationParameterFields[ai][pi].evaluateToObject(row);
+ aggregationParameterFields[ai][pi].evaluate(row, rowInspector, tempInspectableObject);
+ o[pi] = tempInspectableObject.o;
}
// Update the aggregations.
if (aggregationIsDistinct[ai] && lastInvoke != null) {
@@ -170,36 +190,48 @@
}
}
- public void process(HiveObject row) throws HiveException {
+ public void process(Object row, ObjectInspector rowInspector) throws HiveException {
try {
// Compute the keys
- ArrayList<HiveObject> keys = new ArrayList<HiveObject>(keyFields.length);
+ ArrayList<Object> newKeys = new ArrayList<Object>(keyFields.length);
for (int i = 0; i < keyFields.length; i++) {
- keys.add(keyFields[i].evaluate(row));
+ keyFields[i].evaluate(row, rowInspector, tempInspectableObject);
+ newKeys.add(tempInspectableObject.o);
+ if (firstRow) {
+ objectInspectors.set(i, tempInspectableObject.oi);
+ }
+ }
+ if (firstRow) {
+ firstRow = false;
+ ArrayList<String> fieldNames = new ArrayList<String>(objectInspectors.size());
+ for(int i=0; i<objectInspectors.size(); i++) {
+ fieldNames.add(Integer.valueOf(i).toString());
+ }
+ outputObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ fieldNames, objectInspectors);
}
- CompositeHiveObject newKeys = new CompositeHiveObject(keys);
-
// Prepare aggs for updating
UDAF[] aggs = null;
Object[][] lastInvoke = null;
if (aggregations != null) {
// sort-based aggregation
// Need to forward?
- if (currentKeys != null && !newKeys.equals(currentKeys)) {
- forward(currentKeys, aggregations);
+ boolean keysAreEqual = newKeys.equals(currentKeys);
+ if (currentKeys != null && !keysAreEqual) {
+ forward(currentKeys, aggregations);
}
// Need to update the keys?
- if (currentKeys == null || !newKeys.equals(currentKeys)) {
- currentKeys = newKeys;
- // init aggregations
- for(UDAF aggregation: aggregations) {
- aggregation.init();
- }
- // clear parameters in last-invoke
- for(int i=0; i<aggregationsParametersLastInvoke.length; i++) {
- aggregationsParametersLastInvoke[i] = null;
- }
+ if (currentKeys == null || !keysAreEqual) {
+ currentKeys = newKeys;
+ // init aggregations
+ for(UDAF aggregation: aggregations) {
+ aggregation.init();
+ }
+ // clear parameters in last-invoke
+ for(int i=0; i<aggregationsParametersLastInvoke.length; i++) {
+ aggregationsParametersLastInvoke[i] = null;
+ }
}
aggs = aggregations;
lastInvoke = aggregationsParametersLastInvoke;
@@ -215,7 +247,7 @@
}
// Update the aggs
- updateAggregations(aggs, row, lastInvoke);
+ updateAggregations(aggs, row, rowInspector, lastInvoke);
} catch (Exception e) {
e.printStackTrace();
@@ -230,23 +262,16 @@
* The keys in the record
* @throws HiveException
*/
- protected void forward(CompositeHiveObject keys, UDAF[] aggs) throws Exception {
- if (choKeyFields == null) {
- // init choKeyFields
- choKeyFields = new ArrayList<SerDeField>();
- for (int i = 0; i < keyFields.length; i++) {
- choKeyFields.add(keys.getFieldFromExpression(Integer.valueOf(i).toString()));
- }
+ protected void forward(ArrayList<Object> keys, UDAF[] aggs) throws Exception {
+ int totalFields = keys.size() + aggs.length;
+ List<Object> a = new ArrayList<Object>(totalFields);
+ for(int i=0; i<keys.size(); i++) {
+ a.add(keys.get(i));
}
- int totalFields = keys.width + aggs.length;
- CompositeHiveObject cho = new CompositeHiveObject(totalFields);
- for (int i = 0; i < keys.width; i++) {
- cho.addHiveObject(keys.get(choKeyFields.get(i)));
+ for(int i=0; i<aggs.length; i++) {
+ a.add(aggregationsEvaluateMethods[i].invoke(aggs[i]));
}
- for (int i = 0; i < aggs.length; i++) {
- cho.addHiveObject(new PrimitiveHiveObject(aggregationsEvaluateMethods[i].invoke(aggs[i])));
- }
- forward(cho);
+ forward(a, outputObjectInspector);
}
/**
@@ -263,7 +288,7 @@
}
} else if (hashAggregations != null) {
// hash-based aggregations
- for (CompositeHiveObject key: hashAggregations.keySet()) {
+ for (ArrayList<Object> key: hashAggregations.keySet()) {
forward(key, hashAggregations.get(key));
}
} else {
@@ -278,4 +303,5 @@
}
super.close(abort);
}
+
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveObject.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveObject.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveObject.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveObject.java Fri Sep 19 16:56:30 2008
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.hadoop.hive.serde.*;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.io.*;
-import org.apache.hadoop.hive.utils.ByteStream;
-
-/**
- * Data for each row is passed around as HiveObjects in Hive
- */
-
-public abstract class HiveObject {
-
- protected Object javaObject;
-
- protected boolean isNull;
-
- /**
- * @param expr a well formed expression nesting within this Hive Object
- * @return field handler that can be used in a subsequent get() call
- */
- public abstract SerDeField getFieldFromExpression(String expr) throws HiveException;
-
- /**
- * @param field obtained using call to getFieldFromExpression
- * @return another subObject
- */
- public abstract HiveObject get(SerDeField field) throws HiveException;
-
- /**
- * @return get the current HiveObject as a Java Object
- */
- public Object getJavaObject() throws HiveException {
- if (isNull) return null;
- return javaObject;
- }
-
- /**
- * @return get isNull
- */
- public boolean getIsNull() {
- return isNull;
- }
-
- public void setIsNull(boolean isNull) {
- this.isNull = isNull;
- }
-
- /**
- * @return list of top level fields in this Hive Object
- */
- public abstract List<SerDeField> getFields() throws HiveException;
-
- /**
- * Used to detect base case of object hierarchy
- * @return true if the Object encapsulates a Hive Primitive Object. False otherwise
- */
- public abstract boolean isPrimitive();
-
- public abstract int hashCode();
-
- public abstract boolean equals(Object other);
-
- public String toString () {
- try {
- HiveObjectSerializer hos = new NaiiveSerializer();
- ByteStream.Output bos = new ByteStream.Output ();
- hos.serialize(this, new DataOutputStream(bos));
- return new String(bos.getData(), 0, bos.getCount(), "UTF-8");
- } catch (Exception e) {
- return ("Exception: "+e.getMessage());
- }
- }
-
- public static final ArrayList<SerDeField> nlist = new ArrayList<SerDeField> (0);
-}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Fri Sep 19 16:56:30 2008
@@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Stack;
import java.util.Vector;
@@ -32,8 +31,11 @@
import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
import org.apache.hadoop.hive.ql.plan.joinCond;
import org.apache.hadoop.hive.ql.plan.joinDesc;
-import org.apache.hadoop.hive.serde.SerDeField;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
/**
* Join operator implementation.
@@ -43,35 +45,29 @@
// a list of value expressions for each alias are maintained
public static class JoinExprMap {
ExprNodeEvaluator[] valueFields;
- List<SerDeField> listFields;
- public JoinExprMap(ExprNodeEvaluator[] valueFields,
- List<SerDeField> listFields) {
+ public JoinExprMap(ExprNodeEvaluator[] valueFields) {
this.valueFields = valueFields;
- this.listFields = listFields;
}
public ExprNodeEvaluator[] getValueFields() {
return valueFields;
}
- public List<SerDeField> getListFields() {
- return listFields;
- }
}
public static class IntermediateObject{
- CompositeHiveObject[] objs;
+ ArrayList<Object>[] objs;
int curSize;
- public IntermediateObject(CompositeHiveObject[] objs, int curSize) {
+ public IntermediateObject(ArrayList<Object>[] objs, int curSize) {
this.objs = objs;
this.curSize = curSize;
}
- public CompositeHiveObject[] getObjs() { return objs; }
+ public ArrayList<Object>[] getObjs() { return objs; }
public int getCurSize() { return curSize; }
- public void pushObj(CompositeHiveObject obj) { objs[curSize++] = obj; }
+ public void pushObj(ArrayList<Object> obj) { objs[curSize++] = obj; }
public void popObj() { curSize--; }
}
@@ -81,23 +77,24 @@
transient static protected Byte[] order; // order in which the results should be outputted
transient protected joinCond[] condn;
transient protected boolean noOuterJoin;
- transient private HiveObject[] dummyObj; // for outer joins, contains the potential nulls for the concerned aliases
- transient private Vector<CompositeHiveObject>[] dummyObjVectors;
- transient private Stack<Iterator<CompositeHiveObject>> iterators;
+ transient private Object[] dummyObj; // for outer joins, contains the potential nulls for the concerned aliases
+ transient private Vector<ArrayList<Object>>[] dummyObjVectors;
+ transient private Stack<Iterator<ArrayList<Object>>> iterators;
transient private int totalSz; // total size of the composite object
-
+ transient ObjectInspector joinOutputObjectInspector;
+
static
{
aliasField = ExprNodeEvaluatorFactory.get(new exprNodeColumnDesc(String.class, Utilities.ReduceField.ALIAS.toString()));
}
- HashMap<Byte, Vector<CompositeHiveObject>> storage;
+ HashMap<Byte, Vector<ArrayList<Object>>> storage;
public void initialize(Configuration hconf) throws HiveException {
super.initialize(hconf);
totalSz = 0;
// Map that contains the rows for each alias
- storage = new HashMap<Byte, Vector<CompositeHiveObject>>();
+ storage = new HashMap<Byte, Vector<ArrayList<Object>>>();
numValues = conf.getExprs().size();
joinExprs = new HashMap<Byte, JoinExprMap>();
@@ -123,51 +120,61 @@
for (int j = 0; j < sz; j++)
valueFields[j] = ExprNodeEvaluatorFactory.get(expr.get(j));
- joinExprs.put(key, new JoinExprMap(valueFields, CompositeHiveObject
- .getFields(sz)));
+ joinExprs.put(key, new JoinExprMap(valueFields));
}
- dummyObj = new HiveObject[numValues];
+ ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(totalSz);
+ for(int i=0; i<totalSz; i++) {
+ structFieldObjectInspectors.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(String.class));
+ }
+ joinOutputObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+ ObjectInspectorUtils.getIntegerArray(totalSz), structFieldObjectInspectors);
+
+ dummyObj = new Object[numValues];
dummyObjVectors = new Vector[numValues];
int pos = 0;
for (Byte alias : order) {
int sz = map.get(alias).size();
- CompositeHiveObject nr = new CompositeHiveObject(sz);
+ ArrayList<Object> nr = new ArrayList<Object>(sz);
for (int j = 0; j < sz; j++)
- nr.addHiveObject(null);
+ nr.add(null);
dummyObj[pos] = nr;
- Vector<CompositeHiveObject> values = new Vector<CompositeHiveObject>();
- values.add((CompositeHiveObject) dummyObj[pos]);
+ Vector<ArrayList<Object>> values = new Vector<ArrayList<Object>>();
+ values.add((ArrayList<Object>) dummyObj[pos]);
dummyObjVectors[pos] = values;
pos++;
}
- iterators = new Stack<Iterator<CompositeHiveObject>>();
+ iterators = new Stack<Iterator<ArrayList<Object>>>();
}
public void startGroup() throws HiveException {
- l4j.trace("Join: Starting new group");
+ LOG.trace("Join: Starting new group");
storage.clear();
for (Byte alias : order)
- storage.put(alias, new Vector<CompositeHiveObject>());
+ storage.put(alias, new Vector<ArrayList<Object>>());
}
- public void process(HiveObject row) throws HiveException {
+ InspectableObject tempAliasInspectableObject = new InspectableObject();
+ public void process(Object row, ObjectInspector rowInspector) throws HiveException {
try {
// get alias
- Byte alias = (Byte) (aliasField.evaluate(row).getJavaObject());
+ aliasField.evaluate(row, rowInspector, tempAliasInspectableObject);
+ Byte alias = (Byte) (tempAliasInspectableObject.o);
// get the expressions for that alias
JoinExprMap exmap = joinExprs.get(alias);
ExprNodeEvaluator[] valueFields = exmap.getValueFields();
// Compute the values
- CompositeHiveObject nr = new CompositeHiveObject(valueFields.length);
- for (ExprNodeEvaluator vField : valueFields)
- nr.addHiveObject(vField.evaluate(row));
+ ArrayList<Object> nr = new ArrayList<Object>(valueFields.length);
+ for (ExprNodeEvaluator vField : valueFields) {
+ vField.evaluate(row, rowInspector, tempAliasInspectableObject);
+ nr.add(tempAliasInspectableObject.o);
+ }
// Add the value to the vector
storage.get(alias).add(nr);
@@ -178,30 +185,29 @@
}
private void createForwardJoinObject(IntermediateObject intObj, boolean[] nullsArr) throws HiveException {
- CompositeHiveObject nr = new CompositeHiveObject(totalSz);
+ ArrayList<Object> nr = new ArrayList<Object>(totalSz);
for (int i = 0; i < numValues; i++) {
Byte alias = order[i];
int sz = joinExprs.get(alias).getValueFields().length;
- if (nullsArr[i])
- for (int j = 0; j < sz; j++)
- nr.addHiveObject(null);
- else
- {
- List <SerDeField> fields = joinExprs.get(alias).getListFields();
- CompositeHiveObject obj = intObj.getObjs()[i];
- for (int j = 0; j < sz; j++)
- nr.addHiveObject(obj.get(fields.get(j)));
+ if (nullsArr[i]) {
+ for (int j = 0; j < sz; j++) {
+ nr.add(null);
+ }
+ } else {
+ ArrayList<Object> obj = intObj.getObjs()[i];
+ for (int j = 0; j < sz; j++) {
+ nr.add(obj.get(j));
+ }
}
}
-
- forward(nr);
+ forward(nr, joinOutputObjectInspector);
}
private void copyOldArray(boolean[] src, boolean[] dest) {
for (int i = 0; i < src.length; i++) dest[i] = src[i];
}
- private Vector<boolean[]> joinObjectsInnerJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, CompositeHiveObject newObj, IntermediateObject intObj, int left, boolean newObjNull)
+ private Vector<boolean[]> joinObjectsInnerJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
{
if (newObjNull) return resNulls;
Iterator<boolean[]> nullsIter = inputNulls.iterator();
@@ -219,7 +225,7 @@
return resNulls;
}
- private Vector<boolean[]> joinObjectsLeftOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, CompositeHiveObject newObj, IntermediateObject intObj, int left, boolean newObjNull)
+ private Vector<boolean[]> joinObjectsLeftOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
{
Iterator<boolean[]> nullsIter = inputNulls.iterator();
while (nullsIter.hasNext())
@@ -237,7 +243,7 @@
return resNulls;
}
- private Vector<boolean[]> joinObjectsRightOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, CompositeHiveObject newObj, IntermediateObject intObj, int left, boolean newObjNull)
+ private Vector<boolean[]> joinObjectsRightOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
{
if (newObjNull) return resNulls;
boolean allOldObjsNull = true;
@@ -276,7 +282,7 @@
return resNulls;
}
- private Vector<boolean[]> joinObjectsFullOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, CompositeHiveObject newObj, IntermediateObject intObj, int left, boolean newObjNull)
+ private Vector<boolean[]> joinObjectsFullOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
{
if (newObjNull) {
Iterator<boolean[]> nullsIter = inputNulls.iterator();
@@ -344,7 +350,7 @@
* list of nulls is changed appropriately. The list will contain all non-nulls
* for a inner join. The outer joins are processed appropriately.
*/
- private Vector<boolean[]> joinObjects(Vector<boolean[]> inputNulls, CompositeHiveObject newObj, IntermediateObject intObj, int joinPos)
+ private Vector<boolean[]> joinObjects(Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int joinPos)
{
Vector<boolean[]> resNulls = new Vector<boolean[]>();
boolean newObjNull = newObj == dummyObj[joinPos] ? true : false;
@@ -395,11 +401,11 @@
private void genObject(Vector<boolean[]> inputNulls, int aliasNum, IntermediateObject intObj)
throws HiveException {
if (aliasNum < numValues) {
- Iterator<CompositeHiveObject> aliasRes = storage.get(order[aliasNum])
+ Iterator<ArrayList<Object>> aliasRes = storage.get(order[aliasNum])
.iterator();
iterators.push(aliasRes);
while (aliasRes.hasNext()) {
- CompositeHiveObject newObj = aliasRes.next();
+ ArrayList<Object> newObj = aliasRes.next();
intObj.pushObj(newObj);
Vector<boolean[]> newNulls = joinObjects(inputNulls, newObj, intObj, aliasNum);
genObject(newNulls, aliasNum + 1, intObj);
@@ -424,20 +430,24 @@
*/
public void endGroup() throws HiveException {
try {
- l4j.trace("Join Op: endGroup called");
+ LOG.trace("Join Op: endGroup called: numValues=" + numValues);
// does any result need to be emitted
for (int i = 0; i < numValues; i++) {
Byte alias = order[i];
if (storage.get(alias).iterator().hasNext() == false) {
- if (noOuterJoin)
+ if (noOuterJoin) {
+ LOG.trace("No data for alias=" + i);
return;
- else
+ } else {
storage.put(alias, dummyObjVectors[i]);
+ }
}
}
- genObject(null, 0, new IntermediateObject(new CompositeHiveObject[numValues], 0));
+ LOG.trace("calling genObject");
+ genObject(null, 0, new IntermediateObject(new ArrayList[numValues], 0));
+ LOG.trace("called genObject");
} catch (Exception e) {
e.printStackTrace();
throw new HiveException(e);
@@ -449,7 +459,7 @@
*
*/
public void close(boolean abort) throws HiveException {
- l4j.trace("Join Op close");
+ LOG.trace("Join Op close");
super.close(abort);
}
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LabeledCompositeHiveObject.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LabeledCompositeHiveObject.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LabeledCompositeHiveObject.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LabeledCompositeHiveObject.java Fri Sep 19 16:56:30 2008
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import org.apache.hadoop.hive.serde.*;
-/**
- * wrapper over composite hive object that attaches names to each field
- * (instead of the positional names of CompositeHiveObject)
- */
-public class LabeledCompositeHiveObject extends CompositeHiveObject {
- String [] labels;
-
- public LabeledCompositeHiveObject(int width) {
- super(width);
- throw new RuntimeException ("Labaled Hive Objects require field names");
- }
-
- public LabeledCompositeHiveObject(String [] labels) {
- super(labels.length);
- this.labels = labels;
- }
-
- @Override
- public SerDeField getFieldFromExpression(String expr) {
-
- int dot = expr.indexOf(".");
- String label = expr;
- if(dot != -1) {
- assert(dot != (expr.length()-1));
-
- label = expr.substring(0, dot);
- expr = expr.substring(dot+1);
- } else {
- expr = null;
- }
-
- for(int i=0; i<width; i++) {
- if(label.equals(labels[i])) {
- return new CompositeSerDeField(i, expr);
- }
- }
- throw new RuntimeException ("Cannot match expression "+label+"."+expr+" against any label!");
- }
-}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Fri Sep 19 16:56:30 2008
@@ -31,10 +31,12 @@
import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.ql.plan.partitionDesc;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde.ConstantTypedSerDeField;
-import org.apache.hadoop.hive.serde.SerDe;
-import org.apache.hadoop.hive.serde.SerDeException;
-import org.apache.hadoop.hive.serde.SerDeField;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
/**
@@ -48,9 +50,16 @@
private static final long serialVersionUID = 1L;
public static enum Counter {DESERIALIZE_ERRORS}
transient private LongWritable deserialize_error_count = new LongWritable ();
- transient private SerDe decoder;
- transient private ArrayList<String> partCols;
- transient private ArrayList<SerDeField> partFields;
+ transient private Deserializer deserializer;
+
+ transient private Object row;
+ transient private Object[] rowWithPart;
+ transient private StructObjectInspector rowObjectInspector;
+
+ transient private List<String> partNames;
+ transient private List<String> partValues;
+ transient private List<ObjectInspector> partObjectInspectors;
+
public void initialize(Configuration hconf) throws HiveException {
super.initialize(hconf);
@@ -66,12 +75,12 @@
// pick up work corresponding to this configuration path
List<String> aliases = conf.getPathToAliases().get(onefile);
for(String onealias: aliases) {
- l4j.info("Adding alias " + onealias + " to work list for file " + fpath.toUri().getPath());
+ LOG.info("Adding alias " + onealias + " to work list for file " + fpath.toUri().getPath());
todo.add(conf.getAliasToWork().get(onealias));
}
// initialize decoder once based on what table we are processing
- if(decoder != null) {
+ if(deserializer != null) {
continue;
}
@@ -83,7 +92,7 @@
HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, String.valueOf(p.getProperty("name")));
HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, String.valueOf(partSpec));
try {
- Class sdclass = td.getSerdeClass();
+ Class sdclass = td.getDeserializerClass();
if(sdclass == null) {
String className = td.getSerdeClassName();
if ((className == "") || (className == null)) {
@@ -91,28 +100,40 @@
}
sdclass = MapOperator.class.getClassLoader().loadClass(className);
}
- decoder = (SerDe) sdclass.newInstance();
- decoder.initialize(hconf, p);
-
+ deserializer = (Deserializer) sdclass.newInstance();
+ deserializer.initialize(hconf, p);
+ rowObjectInspector = (StructObjectInspector)deserializer.getObjectInspector();
+
// Next check if this table has partitions and if so
// get the list of partition names as well as allocate
// the serdes for the partition columns
String pcols = p.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
if (pcols != null && pcols.length() > 0) {
- partCols = new ArrayList<String>();
- partFields = new ArrayList<SerDeField>();
- String[] part_keys = pcols.trim().split("/");
- for(String key: part_keys) {
- partCols.add(key);
- partFields.add(new ConstantTypedSerDeField(key, partSpec.get(key)));
+ partNames = new ArrayList<String>();
+ partValues = new ArrayList<String>();
+ partObjectInspectors = new ArrayList<ObjectInspector>();
+ String[] partKeys = pcols.trim().split("/");
+ for(String key: partKeys) {
+ partNames.add(key);
+ partValues.add(partSpec.get(key));
+ partObjectInspectors.add(
+ ObjectInspectorFactory.getStandardPrimitiveObjectInspector(String.class));
}
+ StructObjectInspector partObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(partNames, partObjectInspectors);
+
+ rowWithPart = new Object[2];
+ rowWithPart[1] = partValues;
+ rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(
+ Arrays.asList(new StructObjectInspector[]{
+ rowObjectInspector,
+ partObjectInspector}));
}
else {
- partCols = null;
- partFields = null;
+ partNames = null;
+ partValues = null;
}
- l4j.info("Got partitions: " + pcols);
+ LOG.info("Got partitions: " + pcols);
} catch (SerDeException e) {
e.printStackTrace();
throw new HiveException (e);
@@ -129,7 +150,7 @@
if(todo.size() == 0) {
// didn't find match for input file path in configuration!
// serious problem ..
- l4j.error("Configuration does not have any alias for path: " + fpath.toUri().getPath());
+ LOG.error("Configuration does not have any alias for path: " + fpath.toUri().getPath());
throw new HiveException("Configuration and input path are inconsistent");
}
@@ -146,19 +167,24 @@
}
}
- public void process(HiveObject r) throws HiveException {
- throw new RuntimeException("Should not be called!");
- }
-
public void process(Writable value) throws HiveException {
try {
- Object ev = decoder.deserialize(value);
- HiveObject ho = new TableHiveObject(ev, decoder, partCols, partFields);
- forward(ho);
+ if (partNames == null) {
+ row = deserializer.deserialize(value);
+ forward(row, rowObjectInspector);
+ } else {
+ rowWithPart[0] = deserializer.deserialize(value);
+ forward(rowWithPart, rowObjectInspector);
+ }
} catch (SerDeException e) {
// TODO: policy on deserialization errors
deserialize_error_count.set(deserialize_error_count.get()+1);
throw new HiveException (e);
}
}
+
+ public void process(Object row, ObjectInspector rowInspector)
+ throws HiveException {
+ throw new HiveException("Hive 2 Internal error: should not be called!");
+ }
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Fri Sep 19 16:56:30 2008
@@ -42,6 +42,7 @@
public int execute() {
try {
+ // enable assertion
String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
String hiveJar = conf.getJar();
String hiveConfArgs = ExecDriver.generateCmdLine(conf);
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/NullHiveObject.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/NullHiveObject.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/NullHiveObject.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/NullHiveObject.java Fri Sep 19 16:56:30 2008
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.util.*;
-
-import org.apache.hadoop.hive.serde.*;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-/**
- * Represents a NULL object
- */
-public class NullHiveObject extends HiveObject {
-
- public NullHiveObject() {
- setIsNull(true);
- }
-
- public SerDeField getFieldFromExpression(String expr) throws HiveException {
- return null;
- }
-
- public HiveObject get(SerDeField field) throws HiveException {
- return this;
- }
-
- public List<SerDeField> getFields() throws HiveException {
- return null;
- }
-
- public boolean isPrimitive() { return false;}
-
- public boolean equals(Object other) {
- return false;
- }
-
- @Override
- public int hashCode() {
- throw new RuntimeException("not supported");
- }
-}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Sep 19 16:56:30 2008
@@ -23,6 +23,8 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.conf.Configuration;
@@ -86,7 +88,7 @@
transient protected HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable> ();
transient protected OutputCollector out;
- transient protected Log l4j;
+ transient protected Log LOG = LogFactory.getLog(this.getClass().getName());;
transient protected mapredWork gWork;
transient protected String alias;
transient protected String joinAlias;
@@ -159,50 +161,47 @@
}
public void initialize (Configuration hconf) throws HiveException {
- l4j = LogFactory.getLog(this.getClass().getName());
- l4j.info("Initializing Self");
+ LOG.info("Initializing Self");
if(childOperators == null) {
return;
}
- l4j.info("Initializing children:");
+ LOG.info("Initializing children:");
for(Operator<? extends Serializable> op: childOperators) {
op.initialize(hconf);
}
- l4j.info("Initialization Done");
+ LOG.info("Initialization Done");
}
- public abstract void process(HiveObject r) throws HiveException;
+ public abstract void process(Object row, ObjectInspector rowInspector) throws HiveException;
// If a operator wants to do some work at the beginning of a group
public void startGroup() throws HiveException {
- l4j = LogFactory.getLog(this.getClass().getName());
- l4j.trace("Starting group");
+ LOG.debug("Starting group");
if (childOperators == null)
return;
- l4j.trace("Starting group for children:");
+ LOG.debug("Starting group for children:");
for (Operator<? extends Serializable> op: childOperators)
op.startGroup();
- l4j.trace("Start group Done");
+ LOG.debug("Start group Done");
}
// If a operator wants to do some work at the beginning of a group
public void endGroup() throws HiveException
{
- l4j = LogFactory.getLog(this.getClass().getName());
- l4j.trace("Ending group");
+ LOG.debug("Ending group");
if (childOperators == null)
return;
- l4j.trace("Ending group for children:");
+ LOG.debug("Ending group for children:");
for (Operator<? extends Serializable> op: childOperators)
op.endGroup();
- l4j.trace("Start group Done");
+ LOG.debug("End group Done");
}
public void close(boolean abort) throws HiveException {
@@ -218,12 +217,13 @@
}
}
- protected void forward(HiveObject r) throws HiveException {
+ protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
+
if(childOperators == null) {
return;
}
for(Operator<? extends Serializable> o: childOperators) {
- o.process(r);
+ o.process(row, rowInspector);
}
}
@@ -249,7 +249,7 @@
public void logStats () {
for(Enum<?> e: statsMap.keySet()) {
- l4j.info(e.toString() + ":" + statsMap.get(e).toString());
+ LOG.info(e.toString() + ":" + statsMap.get(e).toString());
}
}
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/PrimitiveHiveObject.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/PrimitiveHiveObject.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/PrimitiveHiveObject.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/PrimitiveHiveObject.java Fri Sep 19 16:56:30 2008
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.util.*;
-
-import org.apache.hadoop.hive.serde.*;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-/**
- * Encapsulation for a primitive Java Object
- */
-
-public final class PrimitiveHiveObject extends HiveObject {
-
- public PrimitiveHiveObject(Object javaObject) {
-
- this.javaObject = javaObject;
- }
-
- public SerDeField getFieldFromExpression(String expr) throws HiveException {
- throw new HiveException ("Illegal call getFieldFromExpression() on Primitive Object");
- }
-
- public HiveObject get(SerDeField field) throws HiveException {
- throw new HiveException ("Illegal call get() on Primitive Object");
- }
-
- public List<SerDeField> getFields() throws HiveException {
- throw new HiveException ("Illegal call getFields() on Primitive Object");
- }
-
- public boolean isPrimitive() { return true; }
-
- @Override
- public String toString () {
- return (javaObject == null ? "" : javaObject.toString());
- }
-
- @Override
- public int hashCode() {
- return (javaObject == null ? 0 : javaObject.hashCode());
- }
-
- @Override
- public boolean equals(Object other) {
- if (! (other instanceof PrimitiveHiveObject)) return false;
- return javaObject == null ? ((PrimitiveHiveObject)other).javaObject == null
- : javaObject.equals(((PrimitiveHiveObject)other).javaObject);
- }
-}