You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [9/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAF.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAF.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAF.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAF.java Thu Jan 21 10:37:58 2010
@@ -18,46 +18,36 @@
package org.apache.hadoop.hive.ql.exec;
-import org.apache.hadoop.hive.ql.udf.UDFType;
-//import org.apache.hadoop.hive.serde.ReflectionSerDe;
-
/**
* Base class for all User-defined Aggregation Function (UDAF) classes.
- *
+ *
* UDAF classes are REQUIRED to inherit from this class.
- *
- * Required for a UDAF class:
- * 1. Implement the init() method, which reset the status of the aggregation function.
- * 2. Implement a single method called "aggregate" that returns boolean. The method should
- * always return "true" on valid inputs, or the framework will throw an Exception.
- * Following are some examples:
- * public boolean aggregate(double a);
- * public boolean aggregate(int b);
- * public boolean aggregate(double c, double d);
- * 3. Implement a single method called "evaluate" that returns the FINAL aggregation result.
- * "evaluate" should never return "null" or an Exception will be thrown.
- * Following are some examples.
- * public int evaluate();
- * public long evaluate();
- * public double evaluate();
- * public Double evaluate();
- * public String evaluate();
- *
- * Optional for a UDAF class (by implementing these 2 methods, the user declares that the
- * UDAF support partial aggregations):
- * 1. Implement a single method called "evaluatePartial" that returns the PARTIAL aggregation
- * result. "evaluatePartial" should never return "null" or an Exception will be thrown.
- * 2. Implement a single method called "aggregatePartial" that takes a PARTIAL aggregation
- * result and returns a boolean. The method should always return "true" on valid inputs,
- * or the framework will throw an Exception.
- *
- * Following are some examples:
- * public int evaluatePartial();
- * public boolean aggregatePartial(int partial);
- *
- * public String evaluatePartial();
- * public boolean aggregatePartial(String partial);
- *
+ *
+ * Required for a UDAF class: 1. Implement the init() method, which reset the
+ * status of the aggregation function. 2. Implement a single method called
+ * "aggregate" that returns boolean. The method should always return "true" on
+ * valid inputs, or the framework will throw an Exception. Following are some
+ * examples: public boolean aggregate(double a); public boolean aggregate(int
+ * b); public boolean aggregate(double c, double d); 3. Implement a single
+ * method called "evaluate" that returns the FINAL aggregation result.
+ * "evaluate" should never return "null" or an Exception will be thrown.
+ * Following are some examples. public int evaluate(); public long evaluate();
+ * public double evaluate(); public Double evaluate(); public String evaluate();
+ *
+ * Optional for a UDAF class (by implementing these 2 methods, the user declares
+ * that the UDAF support partial aggregations): 1. Implement a single method
+ * called "evaluatePartial" that returns the PARTIAL aggregation result.
+ * "evaluatePartial" should never return "null" or an Exception will be thrown.
+ * 2. Implement a single method called "aggregatePartial" that takes a PARTIAL
+ * aggregation result and returns a boolean. The method should always return
+ * "true" on valid inputs, or the framework will throw an Exception.
+ *
+ * Following are some examples: public int evaluatePartial(); public boolean
+ * aggregatePartial(int partial);
+ *
+ * public String evaluatePartial(); public boolean aggregatePartial(String
+ * partial);
+ *
*/
public class UDAF {
@@ -65,11 +55,11 @@
* The resolver used for method resolution.
*/
UDAFEvaluatorResolver rslv;
-
+
/**
* The default constructor.
*/
- public UDAF() {
+ public UDAF() {
rslv = new DefaultUDAFEvaluatorResolver(this.getClass());
}
@@ -79,16 +69,17 @@
public UDAF(UDAFEvaluatorResolver rslv) {
this.rslv = rslv;
}
-
+
/**
* Sets the resolver
*
- * @param rslv The method resolver to use for method resolution.
+ * @param rslv
+ * The method resolver to use for method resolution.
*/
public void setResolver(UDAFEvaluatorResolver rslv) {
this.rslv = rslv;
}
-
+
/**
* Gets the resolver.
*/
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAFEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAFEvaluator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAFEvaluator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAFEvaluator.java Thu Jan 21 10:37:58 2010
@@ -19,10 +19,10 @@
package org.apache.hadoop.hive.ql.exec;
/**
- * Interface that encapsulates the evaluation logic of a UDAF. One evaluator is needed
- * for every overloaded form of a UDAF .e.g max and min UDAFs would have evaluators for
- * integer, string and other types. On the other hand avg would have an evaluator only
- * for the double type.
+ * Interface that encapsulates the evaluation logic of a UDAF. One evaluator is
+ * needed for every overloaded form of a UDAF .e.g max and min UDAFs would have
+ * evaluators for integer, string and other types. On the other hand avg would
+ * have an evaluator only for the double type.
*/
public interface UDAFEvaluator {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAFEvaluatorResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAFEvaluatorResolver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAFEvaluatorResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDAFEvaluatorResolver.java Thu Jan 21 10:37:58 2010
@@ -23,22 +23,24 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
/**
- * The UDF Method resolver interface. A user can plugin a resolver to their UDF by implementing the
- * functions in this interface. Note that the resolver is stored in the UDF class as an instance
- * variable. We did not use a static variable because many resolvers maintain the class of the
- * enclosing UDF as state and are called from a base class e.g. UDFBaseCompare. This makes it very
- * easy to write UDFs that want to do resolution similar to the comparison operators. Such UDFs
- * just need to extend UDFBaseCompare and do not have to care about the UDFMethodResolver interface.
- * Same is true for UDFs that want to do resolution similar to that done by the numeric operators.
- * Such UDFs simply have to extend UDFBaseNumericOp class. For the default resolution the UDF
- * implementation simply needs to extend the UDF class.
+ * The UDF Method resolver interface. A user can plugin a resolver to their UDF
+ * by implementing the functions in this interface. Note that the resolver is
+ * stored in the UDF class as an instance variable. We did not use a static
+ * variable because many resolvers maintain the class of the enclosing UDF as
+ * state and are called from a base class e.g. UDFBaseCompare. This makes it
+ * very easy to write UDFs that want to do resolution similar to the comparison
+ * operators. Such UDFs just need to extend UDFBaseCompare and do not have to
+ * care about the UDFMethodResolver interface. Same is true for UDFs that want
+ * to do resolution similar to that done by the numeric operators. Such UDFs
+ * simply have to extend UDFBaseNumericOp class. For the default resolution the
+ * UDF implementation simply needs to extend the UDF class.
*/
public interface UDAFEvaluatorResolver {
-
+
/**
* Gets the evaluator class corresponding to the passed parameter list.
*/
Class<? extends UDAFEvaluator> getEvaluatorClass(List<TypeInfo> argClasses)
- throws AmbiguousMethodException;
-
+ throws AmbiguousMethodException;
+
}
\ No newline at end of file
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDF.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDF.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDF.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDF.java Thu Jan 21 10:37:58 2010
@@ -25,19 +25,17 @@
*
* New UDF classes need to inherit from this UDF class.
*
- * Required for all UDF classes:
- * 1. Implement one or more methods named "evaluate" which will be called by Hive.
- * The following are some examples:
- * public int evaluate();
- * public int evaluate(int a);
- * public double evaluate(int a, double b);
- * public String evaluate(String a, int b, String c);
+ * Required for all UDF classes: 1. Implement one or more methods named
+ * "evaluate" which will be called by Hive. The following are some examples:
+ * public int evaluate(); public int evaluate(int a); public double evaluate(int
+ * a, double b); public String evaluate(String a, int b, String c);
*
- * "evaluate" should never be a void method. However it can return "null" if needed.
+ * "evaluate" should never be a void method. However it can return "null" if
+ * needed.
*/
-@UDFType(deterministic=true)
+@UDFType(deterministic = true)
public class UDF {
-
+
/**
* The resolver to use for method resolution.
*/
@@ -49,23 +47,24 @@
public UDF() {
rslv = new DefaultUDFMethodResolver(this.getClass());
}
-
+
/**
* The constructor with user-provided UDFMethodResolver.
*/
protected UDF(UDFMethodResolver rslv) {
this.rslv = rslv;
}
-
+
/**
* Sets the resolver
*
- * @param rslv The method resolver to use for method resolution.
+ * @param rslv
+ * The method resolver to use for method resolution.
*/
public void setResolver(UDFMethodResolver rslv) {
this.rslv = rslv;
}
-
+
/**
* Get the method resolver.
*/
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentException.java Thu Jan 21 10:37:58 2010
@@ -20,7 +20,7 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
-/**
+/**
* exception class, thrown when udf argument have something wrong.
*/
public class UDFArgumentException extends SemanticException {
@@ -28,13 +28,13 @@
public UDFArgumentException() {
super();
}
-
+
public UDFArgumentException(String message) {
super(message);
}
-
+
public UDFArgumentException(Throwable cause) {
super(cause);
}
-
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentTypeException.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentTypeException.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentTypeException.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFArgumentTypeException.java Thu Jan 21 10:37:58 2010
@@ -18,19 +18,18 @@
package org.apache.hadoop.hive.ql.exec;
-
-/**
+/**
* exception class, thrown when udf arguments have wrong types.
*/
public class UDFArgumentTypeException extends UDFArgumentException {
-
+
int argumentId;
-
+
public UDFArgumentTypeException() {
super();
}
-
+
public UDFArgumentTypeException(int argumentId, String message) {
super(message);
this.argumentId = argumentId;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFMethodResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFMethodResolver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFMethodResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDFMethodResolver.java Thu Jan 21 10:37:58 2010
@@ -24,24 +24,27 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
/**
- * The UDF Method resolver interface. A user can plugin a resolver to their UDF by implementing the
- * functions in this interface. Note that the resolver is stored in the UDF class as an instance
- * variable. We did not use a static variable because many resolvers maintain the class of the
- * enclosing UDF as state and are called from a base class e.g. UDFBaseCompare. This makes it very
- * easy to write UDFs that want to do resolution similar to the comparison operators. Such UDFs
- * just need to extend UDFBaseCompare and do not have to care about the UDFMethodResolver interface.
- * Same is true for UDFs that want to do resolution similar to that done by the numeric operators.
- * Such UDFs simply have to extend UDFBaseNumericOp class. For the default resolution the UDF
- * implementation simply needs to extend the UDF class.
+ * The UDF Method resolver interface. A user can plugin a resolver to their UDF
+ * by implementing the functions in this interface. Note that the resolver is
+ * stored in the UDF class as an instance variable. We did not use a static
+ * variable because many resolvers maintain the class of the enclosing UDF as
+ * state and are called from a base class e.g. UDFBaseCompare. This makes it
+ * very easy to write UDFs that want to do resolution similar to the comparison
+ * operators. Such UDFs just need to extend UDFBaseCompare and do not have to
+ * care about the UDFMethodResolver interface. Same is true for UDFs that want
+ * to do resolution similar to that done by the numeric operators. Such UDFs
+ * simply have to extend UDFBaseNumericOp class. For the default resolution the
+ * UDF implementation simply needs to extend the UDF class.
*/
public interface UDFMethodResolver {
-
+
/**
* Gets the evaluate method for the UDF given the parameter types.
*
- * @param argClasses The list of the argument types that need to matched with the evaluate
- * function signature.
+ * @param argClasses
+ * The list of the argument types that need to matched with the
+ * evaluate function signature.
*/
- public Method getEvalMethod(List<TypeInfo> argClasses)
- throws AmbiguousMethodException, UDFArgumentException;
+ public Method getEvalMethod(List<TypeInfo> argClasses)
+ throws AmbiguousMethodException, UDFArgumentException;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Thu Jan 21 10:37:58 2010
@@ -19,12 +19,7 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -35,7 +30,6 @@
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.generic.UDTFCollector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
@@ -45,29 +39,31 @@
protected final Log LOG = LogFactory.getLog(this.getClass().getName());
- ObjectInspector [] udtfInputOIs = null;
- Object [] objToSendToUDTF = null;
- Object [] forwardObj = new Object[1];
+ ObjectInspector[] udtfInputOIs = null;
+ Object[] objToSendToUDTF = null;
+ Object[] forwardObj = new Object[1];
/**
* sends periodic reports back to the tracker.
*/
transient AutoProgressor autoProgressor;
transient boolean closeCalled = false;
-
+
+ @Override
protected void initializeOp(Configuration hconf) throws HiveException {
conf.getGenericUDTF().setCollector(new UDTFCollector(this));
// Make an object inspector [] of the arguments to the UDTF
- List<? extends StructField> inputFields =
- ((StandardStructObjectInspector)inputObjInspectors[0]).getAllStructFieldRefs();
+ List<? extends StructField> inputFields = ((StandardStructObjectInspector) inputObjInspectors[0])
+ .getAllStructFieldRefs();
udtfInputOIs = new ObjectInspector[inputFields.size()];
- for (int i=0; i<inputFields.size(); i++) {
+ for (int i = 0; i < inputFields.size(); i++) {
udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector();
}
objToSendToUDTF = new Object[inputFields.size()];
- StructObjectInspector udtfOutputOI = conf.getGenericUDTF().initialize(udtfInputOIs);
+ StructObjectInspector udtfOutputOI = conf.getGenericUDTF().initialize(
+ udtfInputOIs);
// Since we're passing the object output by the UDTF directly to the next
// operator, we can use the same OI.
@@ -85,24 +81,25 @@
super.initializeOp(hconf);
}
+ @Override
public void processOp(Object row, int tag) throws HiveException {
// The UDTF expects arguments in an object[]
- StandardStructObjectInspector soi =
- (StandardStructObjectInspector) inputObjInspectors[tag];
+ StandardStructObjectInspector soi = (StandardStructObjectInspector) inputObjInspectors[tag];
List<? extends StructField> fields = soi.getAllStructFieldRefs();
- for (int i=0; i<fields.size(); i++) {
+ for (int i = 0; i < fields.size(); i++) {
objToSendToUDTF[i] = soi.getStructFieldData(row, fields.get(i));
}
conf.getGenericUDTF().process(objToSendToUDTF);
}
+
/**
* forwardUDTFOutput is typically called indirectly by the GenericUDTF when
* the GenericUDTF has generated output rows that should be passed on to the
* next operator(s) in the DAG.
- *
+ *
* @param o
* @throws HiveException
*/
@@ -114,14 +111,17 @@
forward(o, outputObjInspector);
}
+ @Override
public String getName() {
return "UDTF";
}
+ @Override
public int getType() {
return OperatorType.UDTF;
}
+ @Override
protected void closeOp(boolean abort) throws HiveException {
closeCalled = true;
conf.getGenericUDTF().close();
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Thu Jan 21 10:37:58 2010
@@ -33,64 +33,67 @@
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
/**
- * Union Operator
- * Just forwards. Doesn't do anything itself.
+ * Union Operator Just forwards. Doesn't do anything itself.
**/
-public class UnionOperator extends Operator<unionDesc> implements Serializable {
+public class UnionOperator extends Operator<unionDesc> implements Serializable {
private static final long serialVersionUID = 1L;
-
+
StructObjectInspector[] parentObjInspectors;
List<? extends StructField>[] parentFields;
ReturnObjectInspectorResolver[] columnTypeResolvers;
boolean[] needsTransform;
-
+
ArrayList<Object> outputRow;
- /** UnionOperator will transform the input rows if the inputObjInspectors
- * from different parents are different.
- * If one parent has exactly the same ObjectInspector as the output
- * ObjectInspector, then we don't need to do transformation for that parent.
- * This information is recorded in needsTransform[].
+ /**
+ * UnionOperator will transform the input rows if the inputObjInspectors from
+ * different parents are different. If one parent has exactly the same
+ * ObjectInspector as the output ObjectInspector, then we don't need to do
+ * transformation for that parent. This information is recorded in
+ * needsTransform[].
*/
+ @Override
protected void initializeOp(Configuration hconf) throws HiveException {
-
+
int parents = parentOperators.size();
parentObjInspectors = new StructObjectInspector[parents];
parentFields = new List[parents];
for (int p = 0; p < parents; p++) {
- parentObjInspectors[p] = (StructObjectInspector)inputObjInspectors[p];
+ parentObjInspectors[p] = (StructObjectInspector) inputObjInspectors[p];
parentFields[p] = parentObjInspectors[p].getAllStructFieldRefs();
}
-
+
// Get columnNames from the first parent
int columns = parentFields[0].size();
ArrayList<String> columnNames = new ArrayList<String>(columns);
for (int c = 0; c < columns; c++) {
columnNames.add(parentFields[0].get(c).getFieldName());
}
-
+
// Get outputFieldOIs
columnTypeResolvers = new ReturnObjectInspectorResolver[columns];
for (int c = 0; c < columns; c++) {
columnTypeResolvers[c] = new ReturnObjectInspectorResolver();
}
-
+
for (int p = 0; p < parents; p++) {
- assert(parentFields[p].size() == columns);
+ assert (parentFields[p].size() == columns);
for (int c = 0; c < columns; c++) {
- columnTypeResolvers[c].update(parentFields[p].get(c).getFieldObjectInspector());
+ columnTypeResolvers[c].update(parentFields[p].get(c)
+ .getFieldObjectInspector());
}
}
-
- ArrayList<ObjectInspector> outputFieldOIs = new ArrayList<ObjectInspector>(columns);
+
+ ArrayList<ObjectInspector> outputFieldOIs = new ArrayList<ObjectInspector>(
+ columns);
for (int c = 0; c < columns; c++) {
outputFieldOIs.add(columnTypeResolvers[c].get());
}
-
+
// create output row ObjectInspector
- outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
- columnNames, outputFieldOIs);
+ outputObjInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(columnNames, outputFieldOIs);
outputRow = new ArrayList<Object>(columns);
for (int c = 0; c < columns; c++) {
outputRow.add(null);
@@ -99,17 +102,18 @@
// whether we need to do transformation for each parent
needsTransform = new boolean[parents];
for (int p = 0; p < parents; p++) {
- // Testing using != is good enough, because we use ObjectInspectorFactory to
+ // Testing using != is good enough, because we use ObjectInspectorFactory
+ // to
// create ObjectInspectors.
needsTransform[p] = (inputObjInspectors[p] != outputObjInspector);
if (needsTransform[p]) {
- LOG.info("Union Operator needs to transform row from parent[" + p + "] from "
- + inputObjInspectors[p] + " to " + outputObjInspector);
+ LOG.info("Union Operator needs to transform row from parent[" + p
+ + "] from " + inputObjInspectors[p] + " to " + outputObjInspector);
}
}
initializeChildren(hconf);
}
-
+
@Override
public synchronized void processOp(Object row, int tag) throws HiveException {
@@ -118,9 +122,9 @@
if (needsTransform[tag]) {
for (int c = 0; c < fields.size(); c++) {
- outputRow.set(c, columnTypeResolvers[c].convertIfNecessary(
- soi.getStructFieldData(row, fields.get(c)),
- fields.get(c).getFieldObjectInspector()));
+ outputRow.set(c, columnTypeResolvers[c].convertIfNecessary(soi
+ .getStructFieldData(row, fields.get(c)), fields.get(c)
+ .getFieldObjectInspector()));
}
forward(outputRow, outputObjInspector);
} else {
@@ -135,7 +139,8 @@
public String getName() {
return new String("UNION");
}
-
+
+ @Override
public int getType() {
return OperatorType.UNION;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Jan 21 10:37:58 2010
@@ -18,56 +18,77 @@
package org.apache.hadoop.hive.ql.exec;
-
-import java.io.*;
+import java.beans.DefaultPersistenceDelegate;
+import java.beans.Encoder;
+import java.beans.Expression;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.BufferedReader;
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.Serializable;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.beans.*;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.WordUtils;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.ReflectionUtils;
-
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.ql.parse.ErrorMsg;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.*;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ErrorMsg;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.groupByDesc;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.Log;
+import org.apache.hadoop.util.ReflectionUtils;
@SuppressWarnings("nls")
public class Utilities {
@@ -75,47 +96,51 @@
/**
* The object in the reducer are composed of these top level fields
*/
-
+
public static String HADOOP_LOCAL_FS = "file:///";
-
- public static enum ReduceField { KEY, VALUE, ALIAS };
- private static Map<String, mapredWork> gWorkMap=
- Collections.synchronizedMap(new HashMap<String, mapredWork>());
+
+ public static enum ReduceField {
+ KEY, VALUE, ALIAS
+ };
+
+ private static Map<String, mapredWork> gWorkMap = Collections
+ .synchronizedMap(new HashMap<String, mapredWork>());
static final private Log LOG = LogFactory.getLog(Utilities.class.getName());
- public static void clearMapRedWork (Configuration job) {
+ public static void clearMapRedWork(Configuration job) {
try {
Path planPath = new Path(HiveConf.getVar(job, HiveConf.ConfVars.PLAN));
FileSystem fs = FileSystem.get(job);
- if(fs.exists(planPath)) {
- try {
- fs.delete(planPath, true);
- } catch (IOException e) {
- e.printStackTrace();
- }
+ if (fs.exists(planPath)) {
+ try {
+ fs.delete(planPath, true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
} catch (Exception e) {
} finally {
// where a single process works with multiple plans - we must clear
// the cache before working with the next plan.
- synchronized(gWorkMap) {
+ synchronized (gWorkMap) {
gWorkMap.remove(getJobName(job));
}
}
}
- public static mapredWork getMapRedWork (Configuration job) {
+ public static mapredWork getMapRedWork(Configuration job) {
mapredWork gWork = null;
try {
- synchronized(gWorkMap) {
+ synchronized (gWorkMap) {
gWork = gWorkMap.get(getJobName(job));
}
- if(gWork == null) {
+ if (gWork == null) {
synchronized (Utilities.class) {
- if(gWork != null)
+ if (gWork != null) {
return (gWork);
+ }
InputStream in = new FileInputStream("HIVE_PLAN"
- +sanitizedJobId(job));
+ + sanitizedJobId(job));
mapredWork ret = deserializeMapRedWork(in, job);
gWork = ret;
gWork.initialize();
@@ -126,7 +151,7 @@
return (gWork);
} catch (Exception e) {
e.printStackTrace();
- throw new RuntimeException (e);
+ throw new RuntimeException(e);
}
}
@@ -136,60 +161,62 @@
}
ArrayList<String> ret = new ArrayList<String>();
- for(FieldSchema f: fl) {
- ret.add(f.getName() + " " + f.getType() +
- (f.getComment() != null ? (" " + f.getComment()) : ""));
+ for (FieldSchema f : fl) {
+ ret.add(f.getName() + " " + f.getType()
+ + (f.getComment() != null ? (" " + f.getComment()) : ""));
}
return ret;
}
/**
- * Java 1.5 workaround.
- * From http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403
+ * Java 1.5 workaround. From
+ * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5015403
*/
public static class EnumDelegate extends DefaultPersistenceDelegate {
@Override
- protected Expression instantiate(Object oldInstance, Encoder out) {
- return new Expression(Enum.class,
- "valueOf",
- new Object[] { oldInstance.getClass(), ((Enum<?>) oldInstance).name() });
+ protected Expression instantiate(Object oldInstance, Encoder out) {
+ return new Expression(Enum.class, "valueOf", new Object[] {
+ oldInstance.getClass(), ((Enum<?>) oldInstance).name() });
}
+
+ @Override
protected boolean mutatesTo(Object oldInstance, Object newInstance) {
return oldInstance == newInstance;
}
}
- public static void setMapRedWork (Configuration job, mapredWork w) {
+ public static void setMapRedWork(Configuration job, mapredWork w) {
try {
// use the default file system of the job
FileSystem fs = FileSystem.get(job);
- Path planPath = new Path(HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR),
- "plan."+randGen.nextInt());
+ Path planPath = new Path(HiveConf.getVar(job,
+ HiveConf.ConfVars.SCRATCHDIR), "plan." + randGen.nextInt());
FSDataOutputStream out = fs.create(planPath);
serializeMapRedWork(w, out);
HiveConf.setVar(job, HiveConf.ConfVars.PLAN, planPath.toString());
// Set up distributed cache
DistributedCache.createSymlink(job);
String uriWithLink = planPath.toUri().toString() + "#HIVE_PLAN"
- +sanitizedJobId(job);
+ + sanitizedJobId(job);
DistributedCache.addCacheFile(new URI(uriWithLink), job);
- // Cache the object in this process too so lookups don't hit the file system
+ // Cache the object in this process too so lookups don't hit the file
+ // system
synchronized (Utilities.class) {
w.initialize();
- gWorkMap.put(getJobName(job),w);
+ gWorkMap.put(getJobName(job), w);
}
} catch (Exception e) {
e.printStackTrace();
- throw new RuntimeException (e);
+ throw new RuntimeException(e);
}
}
- public static String getJobName( Configuration job) {
+ public static String getJobName(Configuration job) {
String s = HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJOBNAME);
// This is just a backup case. We would like Hive to always have jobnames.
- if(s == null) {
+ if (s == null) {
// There is no job name => we set one
- s = "JOB"+randGen.nextInt();
+ s = "JOB" + randGen.nextInt();
HiveConf.setVar(job, HiveConf.ConfVars.HADOOPJOBNAME, s);
}
return s;
@@ -204,54 +231,64 @@
return s.hashCode();
}
- public static void serializeTasks(Task<? extends Serializable> t, OutputStream out) {
+ public static void serializeTasks(Task<? extends Serializable> t,
+ OutputStream out) {
XMLEncoder e = new XMLEncoder(out);
// workaround for java 1.5
- e.setPersistenceDelegate( ExpressionTypes.class, new EnumDelegate() );
- e.setPersistenceDelegate( groupByDesc.Mode.class, new EnumDelegate());
- e.setPersistenceDelegate( Operator.ProgressCounter.class, new EnumDelegate());
+ e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
+ e.setPersistenceDelegate(groupByDesc.Mode.class, new EnumDelegate());
+ e
+ .setPersistenceDelegate(Operator.ProgressCounter.class,
+ new EnumDelegate());
e.writeObject(t);
e.close();
}
/**
- * Serialize the plan object to an output stream.
- * DO NOT use this to write to standard output since it closes the output stream
- * DO USE mapredWork.toXML() instead
+ * Serialize the plan object to an output stream. DO NOT use this to write to
+ * standard output since it closes the output stream DO USE mapredWork.toXML()
+ * instead
*/
public static void serializeMapRedWork(mapredWork w, OutputStream out) {
XMLEncoder e = new XMLEncoder(out);
// workaround for java 1.5
- e.setPersistenceDelegate( ExpressionTypes.class, new EnumDelegate() );
- e.setPersistenceDelegate( groupByDesc.Mode.class, new EnumDelegate());
+ e.setPersistenceDelegate(ExpressionTypes.class, new EnumDelegate());
+ e.setPersistenceDelegate(groupByDesc.Mode.class, new EnumDelegate());
e.writeObject(w);
e.close();
}
- public static mapredWork deserializeMapRedWork (InputStream in, Configuration conf) {
+ public static mapredWork deserializeMapRedWork(InputStream in,
+ Configuration conf) {
XMLDecoder d = new XMLDecoder(in, null, null, conf.getClassLoader());
- mapredWork ret = (mapredWork)d.readObject();
+ mapredWork ret = (mapredWork) d.readObject();
d.close();
return (ret);
}
public static class Tuple<T, V> {
- private T one;
- private V two;
+ private final T one;
+ private final V two;
public Tuple(T one, V two) {
this.one = one;
this.two = two;
}
- public T getOne() {return this.one;}
- public V getTwo() {return this.two;}
+
+ public T getOne() {
+ return this.one;
+ }
+
+ public V getTwo() {
+ return this.two;
+ }
}
public static tableDesc defaultTd;
static {
// by default we expect ^A separated strings
- // This tableDesc does not provide column names. We should always use
+ // This tableDesc does not provide column names. We should always use
// PlanUtils.getDefaultTableDesc(String separatorCode, String columns)
// or getBinarySortableTableDesc(List<FieldSchema> fieldSchemas) when
// we know the column names.
@@ -273,44 +310,42 @@
public static Random randGen = new Random();
/**
- * Gets the task id if we are running as a Hadoop job.
- * Gets a random number otherwise.
+ * Gets the task id if we are running as a Hadoop job. Gets a random number
+ * otherwise.
*/
public static String getTaskId(Configuration hconf) {
String taskid = (hconf == null) ? null : hconf.get("mapred.task.id");
- if((taskid == null) || taskid.equals("")) {
- return (""+randGen.nextInt());
+ if ((taskid == null) || taskid.equals("")) {
+ return ("" + randGen.nextInt());
} else {
return taskid.replaceAll("task_[0-9]+_", "");
}
}
- public static HashMap makeMap(Object ... olist) {
- HashMap ret = new HashMap ();
- for(int i=0; i<olist.length; i += 2) {
- ret.put(olist[i], olist[i+1]);
+ public static HashMap makeMap(Object... olist) {
+ HashMap ret = new HashMap();
+ for (int i = 0; i < olist.length; i += 2) {
+ ret.put(olist[i], olist[i + 1]);
}
return (ret);
}
- public static Properties makeProperties(String ... olist) {
- Properties ret = new Properties ();
- for(int i=0; i<olist.length; i += 2) {
- ret.setProperty(olist[i], olist[i+1]);
+ public static Properties makeProperties(String... olist) {
+ Properties ret = new Properties();
+ for (int i = 0; i < olist.length; i += 2) {
+ ret.setProperty(olist[i], olist[i + 1]);
}
return (ret);
}
- public static ArrayList makeList(Object ... olist) {
- ArrayList ret = new ArrayList ();
- for(int i=0; i<olist.length; i++) {
- ret.add(olist[i]);
+ public static ArrayList makeList(Object... olist) {
+ ArrayList ret = new ArrayList();
+ for (Object element : olist) {
+ ret.add(element);
}
return (ret);
}
-
-
public static class StreamPrinter extends Thread {
InputStream is;
String type;
@@ -322,17 +357,20 @@
this.os = os;
}
+ @Override
public void run() {
try {
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
- String line=null;
- if(type != null) {
- while ( (line = br.readLine()) != null)
+ String line = null;
+ if (type != null) {
+ while ((line = br.readLine()) != null) {
os.println(type + ">" + line);
+ }
} else {
- while ( (line = br.readLine()) != null)
+ while ((line = br.readLine()) != null) {
os.println(line);
+ }
}
} catch (IOException ioe) {
ioe.printStackTrace();
@@ -341,40 +379,45 @@
}
public static tableDesc getTableDesc(Table tbl) {
- return (new tableDesc (tbl.getDeserializer().getClass(), tbl.getInputFormatClass(), tbl.getOutputFormatClass(), tbl.getSchema()));
+ return (new tableDesc(tbl.getDeserializer().getClass(), tbl
+ .getInputFormatClass(), tbl.getOutputFormatClass(), tbl.getSchema()));
}
-
- //column names and column types are all delimited by comma
+
+ // column names and column types are all delimited by comma
public static tableDesc getTableDesc(String cols, String colTypes) {
return (new tableDesc(LazySimpleSerDe.class, SequenceFileInputFormat.class,
HiveSequenceFileOutputFormat.class, Utilities.makeProperties(
- org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "" + Utilities.ctrlaCode,
+ org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, ""
+ + Utilities.ctrlaCode,
org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS, cols,
org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES, colTypes)));
}
-
- public static partitionDesc getPartitionDesc(Partition part) throws HiveException {
- return (new partitionDesc (part));
+ public static partitionDesc getPartitionDesc(Partition part)
+ throws HiveException {
+ return (new partitionDesc(part));
}
- public static void addMapWork(mapredWork mr, Table tbl, String alias, Operator<?> work) {
+ public static void addMapWork(mapredWork mr, Table tbl, String alias,
+ Operator<?> work) {
mr.addMapWork(tbl.getDataLocation().getPath(), alias, work,
- new partitionDesc(getTableDesc(tbl), null));
+ new partitionDesc(getTableDesc(tbl), null));
}
private static String getOpTreeSkel_helper(Operator<?> op, String indent) {
- if (op == null)
+ if (op == null) {
return "";
+ }
StringBuffer sb = new StringBuffer();
sb.append(indent);
sb.append(op.toString());
sb.append("\n");
- if (op.getChildOperators() != null)
- for(Object child: op.getChildOperators()) {
- sb.append(getOpTreeSkel_helper((Operator<?>)child, indent + " "));
+ if (op.getChildOperators() != null) {
+ for (Object child : op.getChildOperators()) {
+ sb.append(getOpTreeSkel_helper((Operator<?>) child, indent + " "));
}
+ }
return sb.toString();
}
@@ -383,39 +426,46 @@
return getOpTreeSkel_helper(op, "");
}
- private static boolean isWhitespace( int c ) {
- if( c == -1 ) { return false; }
- return Character.isWhitespace( ( char )c );
+ private static boolean isWhitespace(int c) {
+ if (c == -1) {
+ return false;
+ }
+ return Character.isWhitespace((char) c);
}
- public static boolean contentsEqual( InputStream is1, InputStream is2, boolean ignoreWhitespace )
- throws IOException {
+ public static boolean contentsEqual(InputStream is1, InputStream is2,
+ boolean ignoreWhitespace) throws IOException {
try {
- if((is1 == is2) || (is1 == null && is2 == null))
- return true;
+ if ((is1 == is2) || (is1 == null && is2 == null)) {
+ return true;
+ }
- if(is1 == null || is2 == null)
+ if (is1 == null || is2 == null) {
return false;
+ }
- while( true ) {
+ while (true) {
int c1 = is1.read();
- while( ignoreWhitespace && isWhitespace( c1 ) )
+ while (ignoreWhitespace && isWhitespace(c1)) {
c1 = is1.read();
+ }
int c2 = is2.read();
- while( ignoreWhitespace && isWhitespace( c2 ) )
+ while (ignoreWhitespace && isWhitespace(c2)) {
c2 = is2.read();
- if( c1 == -1 && c2 == -1 )
+ }
+ if (c1 == -1 && c2 == -1) {
return true;
- if( c1 != c2 )
+ }
+ if (c1 != c2) {
break;
+ }
}
- } catch( FileNotFoundException e ) {
+ } catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
-
/**
* convert "From src insert blah blah" to "From src insert ... blah"
*/
@@ -425,11 +475,11 @@
int len = str.length();
int suffixlength = 20;
- if(len <= max)
+ if (len <= max) {
return str;
+ }
-
- suffixlength = Math.min(suffixlength, (max-3)/2);
+ suffixlength = Math.min(suffixlength, (max - 3) / 2);
String rev = StringUtils.reverse(str);
// get the last few words
@@ -437,19 +487,24 @@
suffix = StringUtils.reverse(suffix);
// first few ..
- String prefix = StringUtils.abbreviate(str, max-suffix.length());
+ String prefix = StringUtils.abbreviate(str, max - suffix.length());
- return prefix+suffix;
+ return prefix + suffix;
}
public final static String NSTR = "";
- public static enum streamStatus {EOF, TERMINATED}
- public static streamStatus readColumn(DataInput in, OutputStream out) throws IOException {
+
+ public static enum streamStatus {
+ EOF, TERMINATED
+ }
+
+ public static streamStatus readColumn(DataInput in, OutputStream out)
+ throws IOException {
while (true) {
int b;
try {
- b = (int)in.readByte();
+ b = in.readByte();
} catch (EOFException e) {
return streamStatus.EOF;
}
@@ -464,15 +519,17 @@
}
/**
- * Convert an output stream to a compressed output stream based on codecs
- * and compression options specified in the Job Configuration.
- * @param jc Job Configuration
- * @param out Output Stream to be converted into compressed output stream
+ * Convert an output stream to a compressed output stream based on codecs and
+ * compression options specified in the Job Configuration.
+ *
+ * @param jc
+ * Job Configuration
+ * @param out
+ * Output Stream to be converted into compressed output stream
* @return compressed output stream
*/
- public static OutputStream createCompressedStream(JobConf jc,
- OutputStream out)
- throws IOException {
+ public static OutputStream createCompressedStream(JobConf jc, OutputStream out)
+ throws IOException {
boolean isCompressed = FileOutputFormat.getCompressOutput(jc);
return createCompressedStream(jc, out, isCompressed);
}
@@ -481,20 +538,22 @@
* Convert an output stream to a compressed output stream based on codecs
* codecs in the Job Configuration. Caller specifies directly whether file is
* compressed or not
- * @param jc Job Configuration
- * @param out Output Stream to be converted into compressed output stream
- * @param isCompressed whether the output stream needs to be compressed or not
+ *
+ * @param jc
+ * Job Configuration
+ * @param out
+ * Output Stream to be converted into compressed output stream
+ * @param isCompressed
+ * whether the output stream needs to be compressed or not
* @return compressed output stream
*/
public static OutputStream createCompressedStream(JobConf jc,
- OutputStream out,
- boolean isCompressed)
- throws IOException {
- if(isCompressed) {
- Class<? extends CompressionCodec> codecClass =
- FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class);
- CompressionCodec codec = (CompressionCodec)
- ReflectionUtils.newInstance(codecClass, jc);
+ OutputStream out, boolean isCompressed) throws IOException {
+ if (isCompressed) {
+ Class<? extends CompressionCodec> codecClass = FileOutputFormat
+ .getOutputCompressorClass(jc, DefaultCodec.class);
+ CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(
+ codecClass, jc);
return codec.createOutputStream(out);
} else {
return (out);
@@ -502,74 +561,87 @@
}
/**
- * Based on compression option and configured output codec - get extension
- * for output file. This is only required for text files - not sequencefiles
- * @param jc Job Configuration
- * @param isCompressed Whether the output file is compressed or not
+ * Based on compression option and configured output codec - get extension for
+ * output file. This is only required for text files - not sequencefiles
+ *
+ * @param jc
+ * Job Configuration
+ * @param isCompressed
+ * Whether the output file is compressed or not
* @return the required file extension (example: .gz)
*/
public static String getFileExtension(JobConf jc, boolean isCompressed) {
- if(!isCompressed) {
+ if (!isCompressed) {
return "";
} else {
- Class<? extends CompressionCodec> codecClass =
- FileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class);
- CompressionCodec codec = (CompressionCodec)
- ReflectionUtils.newInstance(codecClass, jc);
+ Class<? extends CompressionCodec> codecClass = FileOutputFormat
+ .getOutputCompressorClass(jc, DefaultCodec.class);
+ CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(
+ codecClass, jc);
return codec.getDefaultExtension();
}
}
/**
* Create a sequencefile output stream based on job configuration
- * @param jc Job configuration
- * @param fs File System to create file in
- * @param file Path to be created
- * @param keyClass Java Class for key
- * @param valClass Java Class for value
+ *
+ * @param jc
+ * Job configuration
+ * @param fs
+ * File System to create file in
+ * @param file
+ * Path to be created
+ * @param keyClass
+ * Java Class for key
+ * @param valClass
+ * Java Class for value
* @return output stream over the created sequencefile
*/
- public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs,
- Path file, Class<?> keyClass,
- Class<?> valClass)
- throws IOException {
- boolean isCompressed = SequenceFileOutputFormat.getCompressOutput(jc);
+ public static SequenceFile.Writer createSequenceWriter(JobConf jc,
+ FileSystem fs, Path file, Class<?> keyClass, Class<?> valClass)
+ throws IOException {
+ boolean isCompressed = FileOutputFormat.getCompressOutput(jc);
return createSequenceWriter(jc, fs, file, keyClass, valClass, isCompressed);
}
/**
- * Create a sequencefile output stream based on job configuration
- * Uses user supplied compression flag (rather than obtaining it from the Job Configuration)
- * @param jc Job configuration
- * @param fs File System to create file in
- * @param file Path to be created
- * @param keyClass Java Class for key
- * @param valClass Java Class for value
+ * Create a sequencefile output stream based on job configuration Uses user
+ * supplied compression flag (rather than obtaining it from the Job
+ * Configuration)
+ *
+ * @param jc
+ * Job configuration
+ * @param fs
+ * File System to create file in
+ * @param file
+ * Path to be created
+ * @param keyClass
+ * Java Class for key
+ * @param valClass
+ * Java Class for value
* @return output stream over the created sequencefile
*/
- public static SequenceFile.Writer createSequenceWriter(JobConf jc, FileSystem fs,
- Path file, Class<?> keyClass,
- Class<?> valClass,
- boolean isCompressed)
- throws IOException {
+ public static SequenceFile.Writer createSequenceWriter(JobConf jc,
+ FileSystem fs, Path file, Class<?> keyClass, Class<?> valClass,
+ boolean isCompressed) throws IOException {
CompressionCodec codec = null;
CompressionType compressionType = CompressionType.NONE;
Class codecClass = null;
if (isCompressed) {
compressionType = SequenceFileOutputFormat.getOutputCompressionType(jc);
- codecClass = SequenceFileOutputFormat.getOutputCompressorClass(jc, DefaultCodec.class);
- codec = (CompressionCodec)
- ReflectionUtils.newInstance(codecClass, jc);
+ codecClass = FileOutputFormat.getOutputCompressorClass(jc,
+ DefaultCodec.class);
+ codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, jc);
}
- return (SequenceFile.createWriter(fs, jc, file,
- keyClass, valClass, compressionType, codec));
+ return (SequenceFile.createWriter(fs, jc, file, keyClass, valClass,
+ compressionType, codec));
}
/**
* Create a RCFile output stream based on job configuration Uses user supplied
* compression flag (rather than obtaining it from the Job Configuration)
- *
+ *
* @param jc
* Job configuration
* @param fs
@@ -593,9 +665,10 @@
/**
* Shamelessly cloned from GenericOptionsParser
*/
- public static String realFile(String newFile, Configuration conf) throws IOException {
+ public static String realFile(String newFile, Configuration conf)
+ throws IOException {
Path path = new Path(newFile);
- URI pathURI = path.toUri();
+ URI pathURI = path.toUri();
FileSystem fs;
if (pathURI.getScheme() == null) {
@@ -610,7 +683,9 @@
try {
fs.close();
- } catch(IOException e){};
+ } catch (IOException e) {
+ }
+ ;
String file = path.makeQualified(fs).toString();
// For compatibility with hadoop 0.17, change file:/a/b/c to file:///a/b/c
@@ -622,13 +697,18 @@
}
public static List<String> mergeUniqElems(List<String> src, List<String> dest) {
- if (dest == null) return src;
- if (src == null) return dest;
+ if (dest == null) {
+ return src;
+ }
+ if (src == null) {
+ return dest;
+ }
int pos = 0;
while (pos < dest.size()) {
- if (!src.contains(dest.get(pos)))
+ if (!src.contains(dest.get(pos))) {
src.add(dest.get(pos));
+ }
pos++;
}
@@ -638,8 +718,9 @@
private static final String tmpPrefix = "_tmp.";
public static Path toTempPath(Path orig) {
- if(orig.getName().indexOf(tmpPrefix) == 0)
+ if (orig.getName().indexOf(tmpPrefix) == 0) {
return orig;
+ }
return new Path(orig.getParent(), tmpPrefix + orig.getName());
}
@@ -661,40 +742,49 @@
}
/**
- * Rename src to dst, or in the case dst already exists, move files in src
- * to dst. If there is an existing file with the same name, the new file's
- * name will be appended with "_1", "_2", etc.
- * @param fs the FileSystem where src and dst are on.
- * @param src the src directory
- * @param dst the target directory
+ * Rename src to dst, or in the case dst already exists, move files in src to
+ * dst. If there is an existing file with the same name, the new file's name
+ * will be appended with "_1", "_2", etc.
+ *
+ * @param fs
+ * the FileSystem where src and dst are on.
+ * @param src
+ * the src directory
+ * @param dst
+ * the target directory
* @throws IOException
*/
static public void rename(FileSystem fs, Path src, Path dst)
- throws IOException, HiveException {
+ throws IOException, HiveException {
if (!fs.rename(src, dst)) {
- throw new HiveException ("Unable to move: " + src + " to: " + dst);
+ throw new HiveException("Unable to move: " + src + " to: " + dst);
}
}
+
/**
- * Rename src to dst, or in the case dst already exists, move files in src
- * to dst. If there is an existing file with the same name, the new file's
- * name will be appended with "_1", "_2", etc.
- * @param fs the FileSystem where src and dst are on.
- * @param src the src directory
- * @param dst the target directory
+ * Rename src to dst, or in the case dst already exists, move files in src to
+ * dst. If there is an existing file with the same name, the new file's name
+ * will be appended with "_1", "_2", etc.
+ *
+ * @param fs
+ * the FileSystem where src and dst are on.
+ * @param src
+ * the src directory
+ * @param dst
+ * the target directory
* @throws IOException
*/
static public void renameOrMoveFiles(FileSystem fs, Path src, Path dst)
- throws IOException, HiveException {
+ throws IOException, HiveException {
if (!fs.exists(dst)) {
if (!fs.rename(src, dst)) {
- throw new HiveException ("Unable to move: " + src + " to: " + dst);
+ throw new HiveException("Unable to move: " + src + " to: " + dst);
}
} else {
// move file by file
FileStatus[] files = fs.listStatus(src);
- for (int i=0; i<files.length; i++) {
- Path srcFilePath = files[i].getPath();
+ for (FileStatus file : files) {
+ Path srcFilePath = file.getPath();
String fileName = srcFilePath.getName();
Path dstFilePath = new Path(dst, fileName);
if (fs.exists(dstFilePath)) {
@@ -705,26 +795,29 @@
} while (fs.exists(dstFilePath));
}
if (!fs.rename(srcFilePath, dstFilePath)) {
- throw new HiveException ("Unable to move: " + src + " to: " + dst);
+ throw new HiveException("Unable to move: " + src + " to: " + dst);
}
}
}
}
- /** The first group will contain the task id.
- * The second group is the optional extension.
- * The file name looks like: "24931_r_000000_0" or "24931_r_000000_0.gz"
+ /**
+ * The first group will contain the task id. The second group is the optional
+ * extension. The file name looks like: "24931_r_000000_0" or
+ * "24931_r_000000_0.gz"
*/
- static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$");
+ static Pattern fileNameTaskIdRegex = Pattern
+ .compile("^.*_([0-9]*)_[0-9](\\..*)?$");
/**
- * Get the task id from the filename.
- * E.g., get "000000" out of "24931_r_000000_0" or "24931_r_000000_0.gz"
+ * Get the task id from the filename. E.g., get "000000" out of
+ * "24931_r_000000_0" or "24931_r_000000_0.gz"
*/
public static String getTaskIdFromFilename(String filename) {
Matcher m = fileNameTaskIdRegex.matcher(filename);
if (!m.matches()) {
- LOG.warn("Unable to get task id from file name: " + filename + ". Using full filename as task id.");
+ LOG.warn("Unable to get task id from file name: " + filename
+ + ". Using full filename as task id.");
return filename;
} else {
String taskId = m.group(1);
@@ -732,22 +825,27 @@
return taskId;
}
}
+
/**
- * Remove all temporary files and duplicate (double-committed) files from a given directory.
+ * Remove all temporary files and duplicate (double-committed) files from a
+ * given directory.
*/
- public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException {
- if(path == null)
+ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path)
+ throws IOException {
+ if (path == null) {
return;
+ }
FileStatus items[] = fs.listStatus(path);
- if(items == null)
+ if (items == null) {
return;
+ }
HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>();
- for(FileStatus one: items) {
- if(isTempPath(one)) {
- if(!fs.delete(one.getPath(), true)) {
- throw new IOException ("Unable to delete tmp file: " + one.getPath());
+ for (FileStatus one : items) {
+ if (isTempPath(one)) {
+ if (!fs.delete(one.getPath(), true)) {
+ throw new IOException("Unable to delete tmp file: " + one.getPath());
}
} else {
String taskId = getTaskIdFromFilename(one.getPath().getName());
@@ -755,8 +853,8 @@
if (otherFile == null) {
taskIdToFile.put(taskId, one);
} else {
- if(!fs.delete(one.getPath(), true)) {
- throw new IOException ("Unable to delete duplicate file: "
+ if (!fs.delete(one.getPath(), true)) {
+ throw new IOException("Unable to delete duplicate file: "
+ one.getPath() + ". Existing file: " + otherFile.getPath());
} else {
LOG.warn("Duplicate taskid file removed: " + one.getPath()
@@ -768,30 +866,32 @@
}
public static String getNameMessage(Exception e) {
- return e.getClass().getName() + "(" + e.getMessage() + ")";
+ return e.getClass().getName() + "(" + e.getMessage() + ")";
}
/**
* Add new elements to the classpath
- *
+ *
* @param newPaths
* Array of classpath elements
*/
- public static ClassLoader addToClassPath(ClassLoader cloader, String[] newPaths) throws Exception {
- URLClassLoader loader = (URLClassLoader)cloader;
+ public static ClassLoader addToClassPath(ClassLoader cloader,
+ String[] newPaths) throws Exception {
+ URLClassLoader loader = (URLClassLoader) cloader;
List<URL> curPath = Arrays.asList(loader.getURLs());
ArrayList<URL> newPath = new ArrayList<URL>();
// get a list with the current classpath components
- for(URL onePath: curPath) {
+ for (URL onePath : curPath) {
newPath.add(onePath);
}
curPath = newPath;
for (String onestr : newPaths) {
// special processing for hadoop-17. file:// needs to be removed
- if (StringUtils.indexOf(onestr, "file://") == 0)
+ if (StringUtils.indexOf(onestr, "file://") == 0) {
onestr = StringUtils.substring(onestr, 7);
+ }
URL oneurl = (new File(onestr)).toURL();
if (!curPath.contains(oneurl)) {
@@ -804,19 +904,21 @@
/**
* remove elements from the classpath
- *
+ *
* @param pathsToRemove
* Array of classpath elements
*/
- public static void removeFromClassPath(String[] pathsToRemove) throws Exception {
+ public static void removeFromClassPath(String[] pathsToRemove)
+ throws Exception {
Thread curThread = Thread.currentThread();
URLClassLoader loader = (URLClassLoader) curThread.getContextClassLoader();
Set<URL> newPath = new HashSet<URL>(Arrays.asList(loader.getURLs()));
for (String onestr : pathsToRemove) {
// special processing for hadoop-17. file:// needs to be removed
- if (StringUtils.indexOf(onestr, "file://") == 0)
+ if (StringUtils.indexOf(onestr, "file://") == 0) {
onestr = StringUtils.substring(onestr, 7);
+ }
URL oneurl = (new File(onestr)).toURL();
newPath.remove(oneurl);
@@ -843,35 +945,38 @@
return names;
}
- public static List<String> getColumnNamesFromFieldSchema(List<FieldSchema> partCols) {
+ public static List<String> getColumnNamesFromFieldSchema(
+ List<FieldSchema> partCols) {
List<String> names = new ArrayList<String>();
for (FieldSchema o : partCols) {
names.add(o.getName());
}
return names;
}
-
+
public static List<String> getColumnNames(Properties props) {
List<String> names = new ArrayList<String>();
String colNames = props.getProperty(Constants.LIST_COLUMNS);
String[] cols = colNames.trim().split(",");
if (cols != null) {
- for(String col : cols) {
- if(col!=null && !col.trim().equals(""))
+ for (String col : cols) {
+ if (col != null && !col.trim().equals("")) {
names.add(col);
+ }
}
}
return names;
}
-
+
public static List<String> getColumnTypes(Properties props) {
List<String> names = new ArrayList<String>();
String colNames = props.getProperty(Constants.LIST_COLUMN_TYPES);
String[] cols = colNames.trim().split(",");
if (cols != null) {
- for(String col : cols) {
- if(col!=null && !col.trim().equals(""))
+ for (String col : cols) {
+ if (col != null && !col.trim().equals("")) {
names.add(col);
+ }
}
}
return names;
@@ -891,21 +996,23 @@
break;
}
}
- if (!found)
+ if (!found) {
throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg());
+ }
}
}
/**
* Gets the default notification interval to send progress updates to the
* tracker. Useful for operators that may not output data for a while.
- *
+ *
* @param hconf
* @return the interval in miliseconds
*/
public static int getDefaultNotificationInterval(Configuration hconf) {
int notificationInterval;
- Integer expInterval = Integer.decode(hconf.get("mapred.tasktracker.expiry.interval"));
+ Integer expInterval = Integer.decode(hconf
+ .get("mapred.tasktracker.expiry.interval"));
if (expInterval != null) {
notificationInterval = expInterval.intValue() / 2;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/description.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/description.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/description.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/description.java Thu Jan 21 10:37:58 2010
@@ -24,6 +24,8 @@
@Retention(RetentionPolicy.RUNTIME)
public @interface description {
String value() default "_FUNC_ is undocumented";
+
String extended() default "";
+
String name() default "";
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/DCLLItem.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/DCLLItem.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/DCLLItem.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/DCLLItem.java Thu Jan 21 10:37:58 2010
@@ -19,77 +19,87 @@
package org.apache.hadoop.hive.ql.exec.persistence;
/**
- * Doubly circular linked list item.
+ * Doubly circular linked list item.
*/
public class DCLLItem {
-
+
DCLLItem prev;
DCLLItem next;
-
+
DCLLItem() {
prev = next = this;
}
-
+
/**
* Get the next item.
+ *
* @return the next item.
*/
- public DCLLItem getNext() {
- return next;
+ public DCLLItem getNext() {
+ return next;
}
-
+
/**
* Get the previous item.
+ *
* @return the previous item.
*/
- public DCLLItem getPrev() {
- return prev;
+ public DCLLItem getPrev() {
+ return prev;
}
-
+
/**
* Set the next item as itm.
- * @param itm the item to be set as next.
+ *
+ * @param itm
+ * the item to be set as next.
*/
- public void setNext(DCLLItem itm) {
- next = itm;
+ public void setNext(DCLLItem itm) {
+ next = itm;
}
-
+
/**
* Set the previous item as itm
- * @param itm the item to be set as previous.
+ *
+ * @param itm
+ * the item to be set as previous.
*/
- public void setPrev(DCLLItem itm) {
- prev = itm;
+ public void setPrev(DCLLItem itm) {
+ prev = itm;
}
-
+
/**
* Remove the current item from the doubly circular linked list.
*/
public void remove() {
- next.prev = this.prev;
- prev.next = this.next;
- this.prev = this.next = null;
+ next.prev = prev;
+ prev.next = next;
+ prev = next = null;
}
-
+
/**
* Add v as the previous of the current list item.
- * @param v inserted item.
+ *
+ * @param v
+ * inserted item.
*/
public void insertBefore(DCLLItem v) {
- this.prev.next = v;
- v.prev = this.prev;
+ prev.next = v;
+ v.prev = prev;
v.next = this;
- this.prev = v;
+ prev = v;
}
-
+
/**
* Add v as the previous of the current list item.
- * @param v inserted item.
+ *
+ * @param v
+ * inserted item.
*/
public void insertAfter(DCLLItem v) {
- this.next.prev = v;
- v.next = this.next;
+ next.prev = v;
+ v.next = next;
v.prev = this;
- this.next = v;
+ next = v;
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Thu Jan 21 10:37:58 2010
@@ -20,169 +20,180 @@
import java.io.File;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
-import java.util.HashSet;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.util.jdbm.RecordManager;
import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerFactory;
import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerOptions;
-import org.apache.hadoop.hive.ql.util.jdbm.htree.HTree;
import org.apache.hadoop.hive.ql.util.jdbm.helper.FastIterator;
-import org.apache.hadoop.hive.ql.exec.persistence.MRU;
-import org.apache.hadoop.hive.ql.exec.persistence.DCLLItem;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.util.jdbm.htree.HTree;
/**
- * Simple wrapper for persistent Hashmap implementing only the put/get/remove/clear interface.
- * The main memory hash table acts as a cache and all put/get will operate on it first. If the
- * size of the main memory hash table exceeds a certain threshold, new elements will go into
- * the persistent hash table.
+ * Simple wrapper for persistent Hashmap implementing only the
+ * put/get/remove/clear interface. The main memory hash table acts as a cache
+ * and all put/get will operate on it first. If the size of the main memory hash
+ * table exceeds a certain threshold, new elements will go into the persistent
+ * hash table.
*/
-public class HashMapWrapper<K,V> {
-
+public class HashMapWrapper<K, V> {
+
protected Log LOG = LogFactory.getLog(this.getClass().getName());
-
+
// default threshold for using main memory based HashMap
private static final int THRESHOLD = 25000;
-
- private int threshold; // threshold to put data into persistent hash table instead
- private HashMap<K,MRUItem> mHash; // main memory HashMap
- private HTree pHash; // persistent HashMap
- private RecordManager recman; // record manager required by HTree
- private File tmpFile; // temp file holding the persistent data from record manager.
- private MRU<MRUItem> MRUList; // MRU cache entry
-
+
+ private int threshold; // threshold to put data into persistent hash table
+ // instead
+ private HashMap<K, MRUItem> mHash; // main memory HashMap
+ private HTree pHash; // persistent HashMap
+ private RecordManager recman; // record manager required by HTree
+ private File tmpFile; // temp file holding the persistent data from record
+ // manager.
+ private MRU<MRUItem> MRUList; // MRU cache entry
+
/**
- * Doubly linked list of value items.
- * Note: this is only used along with memory hash table. Persistent hash stores the value directory.
+ * Doubly linked list of value items. Note: this is only used along with
+ * memory hash table. Persistent hash stores the value directory.
*/
class MRUItem extends DCLLItem {
K key;
V value;
-
+
MRUItem(K k, V v) {
key = k;
value = v;
}
}
-
+
/**
* Constructor.
- * @param threshold User specified threshold to store new values into persistent storage.
+ *
+ * @param threshold
+ * User specified threshold to store new values into persistent
+ * storage.
*/
public HashMapWrapper(int threshold) {
this.threshold = threshold;
this.pHash = null;
this.recman = null;
this.tmpFile = null;
- mHash = new HashMap<K,MRUItem>();
+ mHash = new HashMap<K, MRUItem>();
MRUList = new MRU<MRUItem>();
}
-
- public HashMapWrapper () {
+
+ public HashMapWrapper() {
this(THRESHOLD);
}
-
+
/**
- * Get the value based on the key. We try to get it from the main memory hash table first.
- * If it is not there we will look up the persistent hash table. This function also guarantees
- * if any item is found given a key, it is available in main memory HashMap. So mutating the
- * returned value will be reflected (saved) in HashMapWrapper.
+ * Get the value based on the key. We try to get it from the main memory hash
+ * table first. If it is not there we will look up the persistent hash table.
+ * This function also guarantees if any item is found given a key, it is
+ * available in main memory HashMap. So mutating the returned value will be
+ * reflected (saved) in HashMapWrapper.
+ *
* @param key
- * @return Value corresponding to the key. If the key is not found, return null.
+ * @return Value corresponding to the key. If the key is not found, return
+ * null.
*/
public V get(K key) throws HiveException {
V value = null;
-
+
// if not the MRU, searching the main memory hash table.
MRUItem item = mHash.get(key);
- if ( item != null ) {
+ if (item != null) {
value = item.value;
MRUList.moveToHead(item);
- } else if ( pHash != null ) {
+ } else if (pHash != null) {
try {
value = (V) pHash.get(key);
- if ( value != null ) {
- if ( mHash.size() < threshold ) {
+ if (value != null) {
+ if (mHash.size() < threshold) {
mHash.put(key, new MRUItem(key, value));
- pHash.remove(key);
- } else if ( threshold > 0 ) { // flush the LRU to disk
+ pHash.remove(key);
+ } else if (threshold > 0) { // flush the LRU to disk
MRUItem tail = MRUList.tail(); // least recently used item
- pHash.put(tail.key, tail.value);
- pHash.remove(key);
- recman.commit();
-
- // update mHash -- reuse MRUItem
- item = mHash.remove(tail.key);
- item.key = key;
- item.value = value;
- mHash.put(key, item);
-
+ pHash.put(tail.key, tail.value);
+ pHash.remove(key);
+ recman.commit();
+
+ // update mHash -- reuse MRUItem
+ item = mHash.remove(tail.key);
+ item.key = key;
+ item.value = value;
+ mHash.put(key, item);
+
// update MRU -- reusing MRUItem
- tail.key = key;
- tail.value = value;
- MRUList.moveToHead(tail);
+ tail.key = key;
+ tail.value = value;
+ MRUList.moveToHead(tail);
}
}
- } catch ( Exception e ) {
+ } catch (Exception e) {
LOG.warn(e.toString());
throw new HiveException(e);
}
- }
+ }
return value;
}
-
+
/**
- * Put the key value pair in the hash table. It will first try to
- * put it into the main memory hash table. If the size exceeds the
- * threshold, it will put it into the persistent hash table.
+ * Put the key value pair in the hash table. It will first try to put it into
+ * the main memory hash table. If the size exceeds the threshold, it will put
+ * it into the persistent hash table.
+ *
* @param key
* @param value
* @throws HiveException
*/
- public void put(K key, V value) throws HiveException {
+ public void put(K key, V value) throws HiveException {
int mm_size = mHash.size();
MRUItem itm = mHash.get(key);
-
+
if (mm_size < threshold) {
- if ( itm != null ) {
+ if (itm != null) {
// re-use the MRU item -- just overwrite value, key is the same
itm.value = value;
- MRUList.moveToHead(itm);
- if (!mHash.get(key).value.equals(value))
- LOG.error("HashMapWrapper.put() reuse MRUItem inconsistency [1].");
- assert(mHash.get(key).value.equals(value));
+ MRUList.moveToHead(itm);
+ if (!mHash.get(key).value.equals(value)) {
+ LOG.error("HashMapWrapper.put() reuse MRUItem inconsistency [1].");
+ }
+ assert (mHash.get(key).value.equals(value));
} else {
// check if key already exists in pHash
try {
- if ( pHash != null && pHash.get(key) != null ) {
+ if (pHash != null && pHash.get(key) != null) {
// remove the old item from pHash and insert the new one
pHash.remove(key);
pHash.put(key, value);
recman.commit();
- return;
+ return;
}
} catch (Exception e) {
e.printStackTrace();
throw new HiveException(e);
}
- itm = new MRUItem(key,value);
+ itm = new MRUItem(key, value);
MRUList.put(itm);
- mHash.put(key, itm);
+ mHash.put(key, itm);
}
} else {
- if ( itm != null ) { // replace existing item
+ if (itm != null) { // replace existing item
// re-use the MRU item -- just overwrite value, key is the same
itm.value = value;
- MRUList.moveToHead(itm);
- if (!mHash.get(key).value.equals(value))
- LOG.error("HashMapWrapper.put() reuse MRUItem inconsistency [2].");
- assert(mHash.get(key).value.equals(value));
+ MRUList.moveToHead(itm);
+ if (!mHash.get(key).value.equals(value)) {
+ LOG.error("HashMapWrapper.put() reuse MRUItem inconsistency [2].");
+ }
+ assert (mHash.get(key).value.equals(value));
} else {
- // for items inserted into persistent hash table, we don't put it into MRU
+ // for items inserted into persistent hash table, we don't put it into
+ // MRU
if (pHash == null) {
pHash = getPersistentHash();
}
@@ -196,62 +207,67 @@
}
}
}
-
+
/**
* Get the persistent hash table.
+ *
* @return persistent hash table
* @throws HiveException
*/
private HTree getPersistentHash() throws HiveException {
try {
- // Create a temporary file for the page manager to hold persistent data.
- if ( tmpFile != null ) {
- tmpFile.delete();
+ // Create a temporary file for the page manager to hold persistent data.
+ if (tmpFile != null) {
+ tmpFile.delete();
}
tmpFile = File.createTempFile("HashMapWrapper", ".tmp", new File("/tmp"));
LOG.info("HashMapWrapper created temp file " + tmpFile.getAbsolutePath());
- // Delete the temp file if the JVM terminate normally through Hadoop job kill command.
+ // Delete the temp file if the JVM terminate normally through Hadoop job
+ // kill command.
// Caveat: it won't be deleted if JVM is killed by 'kill -9'.
- tmpFile.deleteOnExit();
-
+ tmpFile.deleteOnExit();
+
Properties props = new Properties();
- props.setProperty(RecordManagerOptions.CACHE_TYPE, RecordManagerOptions.NO_CACHE);
- props.setProperty(RecordManagerOptions.DISABLE_TRANSACTIONS, "true" );
-
- recman = RecordManagerFactory.createRecordManager(tmpFile, props );
+ props.setProperty(RecordManagerOptions.CACHE_TYPE,
+ RecordManagerOptions.NO_CACHE);
+ props.setProperty(RecordManagerOptions.DISABLE_TRANSACTIONS, "true");
+
+ recman = RecordManagerFactory.createRecordManager(tmpFile, props);
pHash = HTree.createInstance(recman);
} catch (Exception e) {
LOG.warn(e.toString());
throw new HiveException(e);
- }
+ }
return pHash;
}
-
+
/**
- * Clean up the hash table. All elements in the main memory hash table will be removed, and
- * the persistent hash table will be destroyed (temporary file will be deleted).
+ * Clean up the hash table. All elements in the main memory hash table will be
+ * removed, and the persistent hash table will be destroyed (temporary file
+ * will be deleted).
*/
public void clear() throws HiveException {
- if ( mHash != null ) {
+ if (mHash != null) {
mHash.clear();
MRUList.clear();
}
close();
}
-
+
/**
- * Remove one key-value pairs from the hash table based on the given key. If the pairs are
- * removed from the main memory hash table, pairs in the persistent hash table will not be
- * moved to the main memory hash table. Future inserted elements will go into the main memory
- * hash table though.
+ * Remove one key-value pairs from the hash table based on the given key. If
+ * the pairs are removed from the main memory hash table, pairs in the
+ * persistent hash table will not be moved to the main memory hash table.
+ * Future inserted elements will go into the main memory hash table though.
+ *
* @param key
* @throws HiveException
*/
public void remove(Object key) throws HiveException {
MRUItem entry = mHash.remove(key);
- if ( entry != null ) {
+ if (entry != null) {
MRUList.remove(entry);
- } else if ( pHash != null ) {
+ } else if (pHash != null) {
try {
pHash.remove(key);
} catch (Exception e) {
@@ -260,58 +276,64 @@
}
}
}
-
+
/**
* Get a list of all keys in the hash map.
+ *
* @return
*/
public Set<K> keySet() {
HashSet<K> ret = null;
- if ( mHash != null ) {
+ if (mHash != null) {
ret = new HashSet<K>();
ret.addAll(mHash.keySet());
}
- if ( pHash != null ) {
+ if (pHash != null) {
try {
FastIterator fitr = pHash.keys();
- if ( fitr != null ) {
- K k;
- while ( (k = (K) fitr.next()) != null )
- ret.add(k);
- }
- } catch (Exception e) {
+ if (fitr != null) {
+ K k;
+ while ((k = (K) fitr.next()) != null) {
+ ret.add(k);
+ }
+ }
+ } catch (Exception e) {
e.printStackTrace();
- }
+ }
}
return ret;
}
-
+
/**
- * Get the main memory cache capacity.
- * @return the maximum number of items can be put into main memory HashMap cache.
+ * Get the main memory cache capacity.
+ *
+ * @return the maximum number of items can be put into main memory HashMap
+ * cache.
*/
public int cacheSize() {
return threshold;
}
-
+
/**
* Close the persistent hash table and clean it up.
+ *
* @throws HiveException
*/
public void close() throws HiveException {
-
- if ( pHash != null ) {
+
+ if (pHash != null) {
try {
- if ( recman != null )
+ if (recman != null) {
recman.close();
- } catch (Exception e) {
+ }
+ } catch (Exception e) {
throw new HiveException(e);
}
// delete the temporary file
tmpFile.delete();
tmpFile = null;
- pHash = null;
- recman = null;
+ pHash = null;
+ recman = null;
}
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MRU.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MRU.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MRU.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MRU.java Thu Jan 21 10:37:58 2010
@@ -18,21 +18,20 @@
package org.apache.hadoop.hive.ql.exec.persistence;
-import org.apache.hadoop.hive.ql.exec.persistence.DCLLItem;
/**
- * An MRU (Most Recently Used) cache implementation.
- * This implementation maintains a doubly circular linked list and it can be used
- * with an auxiliary data structure such as a HashMap to locate the item quickly.
+ * An MRU (Most Recently Used) cache implementation. This implementation
+ * maintains a doubly circular linked list and it can be used with an auxiliary
+ * data structure such as a HashMap to locate the item quickly.
*/
public class MRU<T extends DCLLItem> {
-
- T head; // head of the linked list -- MRU; tail (head.prev) will be the LRU
-
+
+ T head; // head of the linked list -- MRU; tail (head.prev) will be the LRU
+
public MRU() {
head = null;
}
-
+
/**
* Insert a value into the MRU. It will appear as the head.
*/
@@ -40,16 +39,19 @@
addToHead(item);
return item;
}
-
+
/**
* Remove a item from the MRU list.
- * @param v linked list item.
- */
+ *
+ * @param v
+ * linked list item.
+ */
public void remove(T v) {
- if (v == null)
+ if (v == null) {
return;
- if ( v == head ) {
- if ( head != head.getNext()) {
+ }
+ if (v == head) {
+ if (head != head.getNext()) {
head = (T) head.getNext();
} else {
head = null;
@@ -57,56 +59,61 @@
}
v.remove();
}
-
+
/**
* Get the most recently used.
+ *
* @return the most recently used item.
*/
- public T head() {
+ public T head() {
return head;
}
-
+
/**
* Get the least recently used.
+ *
* @return the least recently used item.
*/
public T tail() {
return (T) head.getPrev();
}
-
+
/**
* Insert a new item as the head
- * @param v the new linked list item to be added to the head.
+ *
+ * @param v
+ * the new linked list item to be added to the head.
*/
private void addToHead(T v) {
- if ( head == null ) {
+ if (head == null) {
head = v;
- } else {
+ } else {
head.insertBefore(v);
head = v;
}
}
-
-
+
/**
- * Move an existing item to the head.
- * @param v the linked list item to be moved to the head.
+ * Move an existing item to the head.
+ *
+ * @param v
+ * the linked list item to be moved to the head.
*/
public void moveToHead(T v) {
- assert(head != null);
- if ( head != v ) {
+ assert (head != null);
+ if (head != v) {
v.remove();
head.insertBefore(v);
head = v;
}
}
-
+
/**
- * Clear all elements in the MRU list.
- * This is not very efficient (linear) since it will call remove() to every item in the list.
+ * Clear all elements in the MRU list. This is not very efficient (linear)
+ * since it will call remove() to every item in the list.
*/
public void clear() {
- while ( head.getNext() != head ) {
+ while (head.getNext() != head) {
head.getNext().remove();
}
head.remove();