You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [8/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Thu Jan 21 10:37:58 2010
@@ -18,15 +18,29 @@
package org.apache.hadoop.hive.ql.exec;
-import java.util.*;
-import java.io.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.plan.*;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.plan.collectDesc;
+import org.apache.hadoop.hive.ql.plan.extractDesc;
+import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.filterDesc;
+import org.apache.hadoop.hive.ql.plan.forwardDesc;
+import org.apache.hadoop.hive.ql.plan.groupByDesc;
+import org.apache.hadoop.hive.ql.plan.joinDesc;
+import org.apache.hadoop.hive.ql.plan.lateralViewJoinDesc;
+import org.apache.hadoop.hive.ql.plan.limitDesc;
+import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.reduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.scriptDesc;
+import org.apache.hadoop.hive.ql.plan.selectDesc;
+import org.apache.hadoop.hive.ql.plan.tableScanDesc;
+import org.apache.hadoop.hive.ql.plan.udtfDesc;
+import org.apache.hadoop.hive.ql.plan.unionDesc;
public class OperatorFactory {
-
+
public final static class opTuple<T extends Serializable> {
public Class<T> descClass;
public Class<? extends Operator<T>> opClass;
@@ -39,32 +53,40 @@
public static ArrayList<opTuple> opvec;
static {
- opvec = new ArrayList<opTuple> ();
- opvec.add(new opTuple<filterDesc> (filterDesc.class, FilterOperator.class));
- opvec.add(new opTuple<selectDesc> (selectDesc.class, SelectOperator.class));
- opvec.add(new opTuple<forwardDesc> (forwardDesc.class, ForwardOperator.class));
- opvec.add(new opTuple<fileSinkDesc> (fileSinkDesc.class, FileSinkOperator.class));
- opvec.add(new opTuple<collectDesc> (collectDesc.class, CollectOperator.class));
- opvec.add(new opTuple<scriptDesc> (scriptDesc.class, ScriptOperator.class));
- opvec.add(new opTuple<reduceSinkDesc> (reduceSinkDesc.class, ReduceSinkOperator.class));
- opvec.add(new opTuple<extractDesc> (extractDesc.class, ExtractOperator.class));
- opvec.add(new opTuple<groupByDesc> (groupByDesc.class, GroupByOperator.class));
- opvec.add(new opTuple<joinDesc> (joinDesc.class, JoinOperator.class));
- opvec.add(new opTuple<mapJoinDesc> (mapJoinDesc.class, MapJoinOperator.class));
- opvec.add(new opTuple<limitDesc> (limitDesc.class, LimitOperator.class));
- opvec.add(new opTuple<tableScanDesc> (tableScanDesc.class, TableScanOperator.class));
- opvec.add(new opTuple<unionDesc> (unionDesc.class, UnionOperator.class));
- opvec.add(new opTuple<udtfDesc> (udtfDesc.class, UDTFOperator.class));
- opvec.add(new opTuple<lateralViewJoinDesc>(lateralViewJoinDesc.class, LateralViewJoinOperator.class));
+ opvec = new ArrayList<opTuple>();
+ opvec.add(new opTuple<filterDesc>(filterDesc.class, FilterOperator.class));
+ opvec.add(new opTuple<selectDesc>(selectDesc.class, SelectOperator.class));
+ opvec
+ .add(new opTuple<forwardDesc>(forwardDesc.class, ForwardOperator.class));
+ opvec.add(new opTuple<fileSinkDesc>(fileSinkDesc.class,
+ FileSinkOperator.class));
+ opvec
+ .add(new opTuple<collectDesc>(collectDesc.class, CollectOperator.class));
+ opvec.add(new opTuple<scriptDesc>(scriptDesc.class, ScriptOperator.class));
+ opvec.add(new opTuple<reduceSinkDesc>(reduceSinkDesc.class,
+ ReduceSinkOperator.class));
+ opvec
+ .add(new opTuple<extractDesc>(extractDesc.class, ExtractOperator.class));
+ opvec
+ .add(new opTuple<groupByDesc>(groupByDesc.class, GroupByOperator.class));
+ opvec.add(new opTuple<joinDesc>(joinDesc.class, JoinOperator.class));
+ opvec
+ .add(new opTuple<mapJoinDesc>(mapJoinDesc.class, MapJoinOperator.class));
+ opvec.add(new opTuple<limitDesc>(limitDesc.class, LimitOperator.class));
+ opvec.add(new opTuple<tableScanDesc>(tableScanDesc.class,
+ TableScanOperator.class));
+ opvec.add(new opTuple<unionDesc>(unionDesc.class, UnionOperator.class));
+ opvec.add(new opTuple<udtfDesc>(udtfDesc.class, UDTFOperator.class));
+ opvec.add(new opTuple<lateralViewJoinDesc>(lateralViewJoinDesc.class,
+ LateralViewJoinOperator.class));
}
-
public static <T extends Serializable> Operator<T> get(Class<T> opClass) {
-
- for(opTuple o: opvec) {
- if(o.descClass == opClass) {
+
+ for (opTuple o : opvec) {
+ if (o.descClass == opClass) {
try {
- Operator<T> op = (Operator<T>)o.opClass.newInstance();
+ Operator<T> op = (Operator<T>) o.opClass.newInstance();
op.initializeCounters();
return op;
} catch (Exception e) {
@@ -73,33 +95,37 @@
}
}
}
- throw new RuntimeException ("No operator for descriptor class " + opClass.getName());
+ throw new RuntimeException("No operator for descriptor class "
+ + opClass.getName());
}
- public static <T extends Serializable> Operator<T> get(Class<T> opClass, RowSchema rwsch) {
-
+ public static <T extends Serializable> Operator<T> get(Class<T> opClass,
+ RowSchema rwsch) {
+
Operator<T> ret = get(opClass);
ret.setSchema(rwsch);
return ret;
}
/**
- * Returns an operator given the conf and a list of children operators.
+ * Returns an operator given the conf and a list of children operators.
*/
- public static <T extends Serializable> Operator<T> get(T conf, Operator<? extends Serializable> ... oplist) {
- Operator<T> ret = get((Class <T>)conf.getClass());
+ public static <T extends Serializable> Operator<T> get(T conf,
+ Operator<? extends Serializable>... oplist) {
+ Operator<T> ret = get((Class<T>) conf.getClass());
ret.setConf(conf);
- if(oplist.length == 0)
+ if (oplist.length == 0) {
return (ret);
+ }
- ArrayList<Operator<? extends Serializable>> clist = new ArrayList<Operator<? extends Serializable>> ();
- for(Operator op: oplist) {
+ ArrayList<Operator<? extends Serializable>> clist = new ArrayList<Operator<? extends Serializable>>();
+ for (Operator op : oplist) {
clist.add(op);
}
ret.setChildOperators(clist);
-
+
// Add this parent to the children
- for(Operator op: oplist) {
+ for (Operator op : oplist) {
List<Operator<? extends Serializable>> parents = op.getParentOperators();
if (parents == null) {
parents = new ArrayList<Operator<? extends Serializable>>();
@@ -111,25 +137,28 @@
}
/**
- * Returns an operator given the conf and a list of children operators.
+ * Returns an operator given the conf and a list of children operators.
*/
- public static <T extends Serializable> Operator<T> get(T conf, RowSchema rwsch, Operator ... oplist) {
+ public static <T extends Serializable> Operator<T> get(T conf,
+ RowSchema rwsch, Operator... oplist) {
Operator<T> ret = get(conf, oplist);
ret.setSchema(rwsch);
return (ret);
}
/**
- * Returns an operator given the conf and a list of parent operators.
+ * Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends Serializable> Operator<T> getAndMakeChild(T conf, Operator ... oplist) {
- Operator<T> ret = get((Class <T>)conf.getClass());
+ public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+ Operator... oplist) {
+ Operator<T> ret = get((Class<T>) conf.getClass());
ret.setConf(conf);
- if(oplist.length == 0)
+ if (oplist.length == 0) {
return (ret);
+ }
// Add the new operator as child of each of the passed in operators
- for(Operator op: oplist) {
+ for (Operator op : oplist) {
List<Operator> children = op.getChildOperators();
if (children == null) {
children = new ArrayList<Operator>();
@@ -140,18 +169,20 @@
// add parents for the newly created operator
List<Operator<? extends Serializable>> parent = new ArrayList<Operator<? extends Serializable>>();
- for(Operator op: oplist)
+ for (Operator op : oplist) {
parent.add(op);
-
+ }
+
ret.setParentOperators(parent);
return (ret);
}
/**
- * Returns an operator given the conf and a list of parent operators.
+ * Returns an operator given the conf and a list of parent operators.
*/
- public static <T extends Serializable> Operator<T> getAndMakeChild(T conf, RowSchema rwsch, Operator ... oplist) {
+ public static <T extends Serializable> Operator<T> getAndMakeChild(T conf,
+ RowSchema rwsch, Operator... oplist) {
Operator<T> ret = getAndMakeChild(conf, oplist);
ret.setSchema(rwsch);
return (ret);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordReader.java Thu Jan 21 10:37:58 2010
@@ -25,10 +25,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
-
public interface RecordReader {
- public void initialize(InputStream in, Configuration conf, Properties tbl) throws IOException;
+ public void initialize(InputStream in, Configuration conf, Properties tbl)
+ throws IOException;
public Writable createRow() throws IOException;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RecordWriter.java Thu Jan 21 10:37:58 2010
@@ -24,10 +24,12 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
-
public interface RecordWriter {
- public void initialize(OutputStream in, Configuration conf) throws IOException;
+ public void initialize(OutputStream in, Configuration conf)
+ throws IOException;
+
public void write(Writable row) throws IOException;
+
public void close() throws IOException;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Thu Jan 21 10:37:58 2010
@@ -42,69 +42,73 @@
/**
* Reduce Sink Operator sends output to the reduce stage
**/
-public class ReduceSinkOperator extends TerminalOperator <reduceSinkDesc> implements Serializable {
+public class ReduceSinkOperator extends TerminalOperator<reduceSinkDesc>
+ implements Serializable {
private static final long serialVersionUID = 1L;
/**
- * The evaluators for the key columns.
- * Key columns decide the sort order on the reducer side.
- * Key columns are passed to the reducer in the "key".
+ * The evaluators for the key columns. Key columns decide the sort order on
+ * the reducer side. Key columns are passed to the reducer in the "key".
*/
transient protected ExprNodeEvaluator[] keyEval;
/**
- * The evaluators for the value columns.
- * Value columns are passed to reducer in the "value".
+ * The evaluators for the value columns. Value columns are passed to reducer
+ * in the "value".
*/
transient protected ExprNodeEvaluator[] valueEval;
/**
- * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language).
- * Partition columns decide the reducer that the current row goes to.
- * Partition columns are not passed to reducer.
+ * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
+ * Hive language). Partition columns decide the reducer that the current row
+ * goes to. Partition columns are not passed to reducer.
*/
transient protected ExprNodeEvaluator[] partitionEval;
-
- // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is ready
+
+ // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
+ // ready
transient Serializer keySerializer;
transient boolean keyIsText;
transient Serializer valueSerializer;
transient int tag;
transient byte[] tagByte = new byte[1];
-
+
+ @Override
protected void initializeOp(Configuration hconf) throws HiveException {
try {
keyEval = new ExprNodeEvaluator[conf.getKeyCols().size()];
- int i=0;
- for(exprNodeDesc e: conf.getKeyCols()) {
+ int i = 0;
+ for (exprNodeDesc e : conf.getKeyCols()) {
keyEval[i++] = ExprNodeEvaluatorFactory.get(e);
}
valueEval = new ExprNodeEvaluator[conf.getValueCols().size()];
- i=0;
- for(exprNodeDesc e: conf.getValueCols()) {
+ i = 0;
+ for (exprNodeDesc e : conf.getValueCols()) {
valueEval[i++] = ExprNodeEvaluatorFactory.get(e);
}
partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
- i=0;
- for(exprNodeDesc e: conf.getPartitionCols()) {
+ i = 0;
+ for (exprNodeDesc e : conf.getPartitionCols()) {
partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
}
tag = conf.getTag();
- tagByte[0] = (byte)tag;
+ tagByte[0] = (byte) tag;
LOG.info("Using tag = " + tag);
tableDesc keyTableDesc = conf.getKeySerializeInfo();
- keySerializer = (Serializer)keyTableDesc.getDeserializerClass().newInstance();
+ keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
+ .newInstance();
keySerializer.initialize(null, keyTableDesc.getProperties());
keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
+
tableDesc valueTableDesc = conf.getValueSerializeInfo();
- valueSerializer = (Serializer)valueTableDesc.getDeserializerClass().newInstance();
+ valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
+ .newInstance();
valueSerializer.initialize(null, valueTableDesc.getProperties());
-
+
firstRow = true;
initializeChildren(hconf);
} catch (Exception e) {
@@ -116,65 +120,72 @@
transient InspectableObject tempInspectableObject = new InspectableObject();
transient HiveKey keyWritable = new HiveKey();
transient Writable value;
-
+
transient StructObjectInspector keyObjectInspector;
transient StructObjectInspector valueObjectInspector;
transient ObjectInspector[] partitionObjectInspectors;
transient Object[] cachedKeys;
transient Object[] cachedValues;
-
+
boolean firstRow;
-
+
transient Random random;
+
+ @Override
public void processOp(Object row, int tag) throws HiveException {
try {
ObjectInspector rowInspector = inputObjInspectors[tag];
if (firstRow) {
firstRow = false;
- keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, conf.getOutputKeyColumnNames(), rowInspector);
- valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf.getOutputValueColumnNames(), rowInspector);
+ keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval, conf
+ .getOutputKeyColumnNames(), rowInspector);
+ valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf
+ .getOutputValueColumnNames(), rowInspector);
partitionObjectInspectors = initEvaluators(partitionEval, rowInspector);
cachedKeys = new Object[keyEval.length];
cachedValues = new Object[valueEval.length];
}
-
-
+
// Evaluate the keys
- for (int i=0; i<keyEval.length; i++) {
+ for (int i = 0; i < keyEval.length; i++) {
cachedKeys[i] = keyEval[i].evaluate(row);
}
-
+
// Serialize the keys and append the tag
if (keyIsText) {
- Text key = (Text)keySerializer.serialize(cachedKeys, keyObjectInspector);
+ Text key = (Text) keySerializer.serialize(cachedKeys,
+ keyObjectInspector);
if (tag == -1) {
keyWritable.set(key.getBytes(), 0, key.getLength());
} else {
int keyLength = key.getLength();
- keyWritable.setSize(keyLength+1);
+ keyWritable.setSize(keyLength + 1);
System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
keyWritable.get()[keyLength] = tagByte[0];
}
} else {
// Must be BytesWritable
- BytesWritable key = (BytesWritable)keySerializer.serialize(cachedKeys, keyObjectInspector);
+ BytesWritable key = (BytesWritable) keySerializer.serialize(cachedKeys,
+ keyObjectInspector);
if (tag == -1) {
keyWritable.set(key.get(), 0, key.getSize());
} else {
int keyLength = key.getSize();
- keyWritable.setSize(keyLength+1);
+ keyWritable.setSize(keyLength + 1);
System.arraycopy(key.get(), 0, keyWritable.get(), 0, keyLength);
keyWritable.get()[keyLength] = tagByte[0];
}
}
-
+
// Set the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide better
- // load balance. If the requirement is to have a single reducer, we should set
+ // If no partition cols, just distribute the data uniformly to provide
+ // better
+ // load balance. If the requirement is to have a single reducer, we
+ // should set
// the number of reducers to 1.
// Use a constant seed to make the code deterministic.
if (random == null) {
@@ -184,47 +195,50 @@
} else {
for (int i = 0; i < partitionEval.length; i++) {
Object o = partitionEval[i].evaluate(row);
- keyHashCode = keyHashCode * 31
- + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+ keyHashCode = keyHashCode * 31
+ + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
}
}
keyWritable.setHashCode(keyHashCode);
-
+
// Evaluate the value
- for (int i=0; i<valueEval.length; i++) {
+ for (int i = 0; i < valueEval.length; i++) {
cachedValues[i] = valueEval[i].evaluate(row);
}
// Serialize the value
value = valueSerializer.serialize(cachedValues, valueObjectInspector);
-
+
} catch (SerDeException e) {
throw new HiveException(e);
}
-
+
try {
if (out != null) {
out.collect(keyWritable, value);
- // Since this is a terminal operator, update counters explicitly - forward is not called
+ // Since this is a terminal operator, update counters explicitly -
+ // forward is not called
if (counterNameToEnum != null) {
- ++this.outputRows;
- if (this.outputRows % 1000 == 0) {
+ ++outputRows;
+ if (outputRows % 1000 == 0) {
incrCounter(numOutputRowsCntr, outputRows);
- this.outputRows = 0;
+ outputRows = 0;
}
}
}
} catch (IOException e) {
- throw new HiveException (e);
+ throw new HiveException(e);
}
}
/**
* @return the name of the operator
*/
+ @Override
public String getName() {
return new String("RS");
}
-
+
+ @Override
public int getType() {
return OperatorType.REDUCESINK;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java Thu Jan 21 10:37:58 2010
@@ -18,8 +18,8 @@
package org.apache.hadoop.hive.ql.exec;
-import java.util.*;
-import java.io.*;
+import java.io.Serializable;
+import java.util.Vector;
/**
* RowSchema Implementation
@@ -30,7 +30,8 @@
private static final long serialVersionUID = 1L;
private Vector<ColumnInfo> signature;
- public RowSchema() {}
+ public RowSchema() {
+ }
public RowSchema(Vector<ColumnInfo> signature) {
this.signature = signature;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Thu Jan 21 10:37:58 2010
@@ -24,7 +24,6 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -48,15 +47,17 @@
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.StringUtils;
-
-public class ScriptOperator extends Operator<scriptDesc> implements Serializable {
+public class ScriptOperator extends Operator<scriptDesc> implements
+ Serializable {
private static final long serialVersionUID = 1L;
+ public static enum Counter {
+ DESERIALIZE_ERRORS, SERIALIZE_ERRORS
+ }
- public static enum Counter {DESERIALIZE_ERRORS, SERIALIZE_ERRORS}
- transient private LongWritable deserialize_error_count = new LongWritable ();
- transient private LongWritable serialize_error_count = new LongWritable ();
+ transient private final LongWritable deserialize_error_count = new LongWritable();
+ transient private final LongWritable serialize_error_count = new LongWritable();
transient Thread outThread = null;
transient Thread errThread = null;
@@ -69,14 +70,15 @@
transient volatile Throwable scriptError = null;
transient RecordWriter scriptOutWriter = null;
- static final String IO_EXCEPTION_BROKEN_PIPE_STRING= "Broken pipe";
+ static final String IO_EXCEPTION_BROKEN_PIPE_STRING = "Broken pipe";
/**
* sends periodic reports back to the tracker.
*/
transient AutoProgressor autoProgressor;
- // first row - the process should only be started if necessary, as it may conflict with some
+ // first row - the process should only be started if necessary, as it may
+ // conflict with some
// of the user assumptions.
transient boolean firstRow;
@@ -89,7 +91,8 @@
for (int i = 0; i < len; i++) {
char c = var.charAt(i);
char s;
- if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
+ if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z')
+ || (c >= 'a' && c <= 'z')) {
s = c;
} else {
s = '_';
@@ -99,35 +102,33 @@
return safe.toString();
}
- static void addJobConfToEnvironment(Configuration conf, Map<String, String> env) {
+ static void addJobConfToEnvironment(Configuration conf,
+ Map<String, String> env) {
Iterator<Map.Entry<String, String>> it = conf.iterator();
while (it.hasNext()) {
- Map.Entry<String, String> en = (Map.Entry<String, String>) it.next();
- String name = (String) en.getKey();
- //String value = (String)en.getValue(); // does not apply variable expansion
+ Map.Entry<String, String> en = it.next();
+ String name = en.getKey();
+ // String value = (String)en.getValue(); // does not apply variable
+ // expansion
String value = conf.get(name); // does variable expansion
name = safeEnvVarName(name);
env.put(name, value);
}
}
-
/**
- * Maps a relative pathname to an absolute pathname using the
- * PATH enviroment.
+ * Maps a relative pathname to an absolute pathname using the PATH enviroment.
*/
- public class PathFinder
- {
- String pathenv; // a string of pathnames
- String pathSep; // the path seperator
- String fileSep; // the file seperator in a directory
+ public class PathFinder {
+ String pathenv; // a string of pathnames
+ String pathSep; // the path seperator
+ String fileSep; // the file seperator in a directory
/**
- * Construct a PathFinder object using the path from
- * the specified system environment variable.
+ * Construct a PathFinder object using the path from the specified system
+ * environment variable.
*/
- public PathFinder(String envpath)
- {
+ public PathFinder(String envpath) {
pathenv = System.getenv(envpath);
pathSep = System.getProperty("path.separator");
fileSep = System.getProperty("file.separator");
@@ -136,25 +137,22 @@
/**
* Appends the specified component to the path list
*/
- public void prependPathComponent(String str)
- {
+ public void prependPathComponent(String str) {
pathenv = str + pathSep + pathenv;
}
/**
- * Returns the full path name of this file if it is listed in the
- * path
+ * Returns the full path name of this file if it is listed in the path
*/
- public File getAbsolutePath(String filename)
- {
- if (pathenv == null || pathSep == null || fileSep == null) {
+ public File getAbsolutePath(String filename) {
+ if (pathenv == null || pathSep == null || fileSep == null) {
return null;
}
- int val = -1;
- String classvalue = pathenv + pathSep;
+ int val = -1;
+ String classvalue = pathenv + pathSep;
- while (((val = classvalue.indexOf(pathSep)) >= 0) &&
- classvalue.length() > 0) {
+ while (((val = classvalue.indexOf(pathSep)) >= 0)
+ && classvalue.length() > 0) {
//
// Extract each entry from the pathenv
//
@@ -170,18 +168,20 @@
f = new File(entry + fileSep + filename);
}
//
- // see if the filename matches and we can read it
+ // see if the filename matches and we can read it
//
if (f.isFile() && f.canRead()) {
return f;
}
- } catch (Exception exp){ }
- classvalue = classvalue.substring(val+1).trim();
+ } catch (Exception exp) {
+ }
+ classvalue = classvalue.substring(val + 1).trim();
}
return null;
}
}
+ @Override
protected void initializeOp(Configuration hconf) throws HiveException {
firstRow = true;
@@ -191,18 +191,22 @@
try {
this.hconf = hconf;
- scriptOutputDeserializer = conf.getScriptOutputInfo().getDeserializerClass().newInstance();
- scriptOutputDeserializer.initialize(hconf, conf.getScriptOutputInfo().getProperties());
-
- scriptInputSerializer = (Serializer)conf.getScriptInputInfo().getDeserializerClass().newInstance();
- scriptInputSerializer.initialize(hconf, conf.getScriptInputInfo().getProperties());
+ scriptOutputDeserializer = conf.getScriptOutputInfo()
+ .getDeserializerClass().newInstance();
+ scriptOutputDeserializer.initialize(hconf, conf.getScriptOutputInfo()
+ .getProperties());
+
+ scriptInputSerializer = (Serializer) conf.getScriptInputInfo()
+ .getDeserializerClass().newInstance();
+ scriptInputSerializer.initialize(hconf, conf.getScriptInputInfo()
+ .getProperties());
outputObjInspector = scriptOutputDeserializer.getObjectInspector();
// initialize all children before starting the script
initializeChildren(hconf);
} catch (Exception e) {
- throw new HiveException ("Cannot initialize ScriptOperator", e);
+ throw new HiveException("Cannot initialize ScriptOperator", e);
}
}
@@ -215,17 +219,20 @@
}
void displayBrokenPipeInfo() {
- LOG.info("The script did not consume all input data. This is considered as an error.");
- LOG.info("set " + HiveConf.ConfVars.ALLOWPARTIALCONSUMP.toString() + "=true; to ignore it.");
+ LOG
+ .info("The script did not consume all input data. This is considered as an error.");
+ LOG.info("set " + HiveConf.ConfVars.ALLOWPARTIALCONSUMP.toString()
+ + "=true; to ignore it.");
return;
}
+ @Override
public void processOp(Object row, int tag) throws HiveException {
// initialize the user's process only when you recieve the first row
if (firstRow) {
firstRow = false;
try {
- String [] cmdArgs = splitArgs(conf.getScriptCmd());
+ String[] cmdArgs = splitArgs(conf.getScriptCmd());
String prog = cmdArgs[0];
File currentDir = new File(".").getAbsoluteFile();
@@ -240,64 +247,78 @@
f = null;
}
- String [] wrappedCmdArgs = addWrapper(cmdArgs);
+ String[] wrappedCmdArgs = addWrapper(cmdArgs);
LOG.info("Executing " + Arrays.asList(wrappedCmdArgs));
- LOG.info("tablename=" + hconf.get(HiveConf.ConfVars.HIVETABLENAME.varname));
- LOG.info("partname=" + hconf.get(HiveConf.ConfVars.HIVEPARTITIONNAME.varname));
+ LOG.info("tablename="
+ + hconf.get(HiveConf.ConfVars.HIVETABLENAME.varname));
+ LOG.info("partname="
+ + hconf.get(HiveConf.ConfVars.HIVEPARTITIONNAME.varname));
LOG.info("alias=" + alias);
ProcessBuilder pb = new ProcessBuilder(wrappedCmdArgs);
Map<String, String> env = pb.environment();
addJobConfToEnvironment(hconf, env);
- env.put(safeEnvVarName(HiveConf.ConfVars.HIVEALIAS.varname), String.valueOf(alias));
+ env.put(safeEnvVarName(HiveConf.ConfVars.HIVEALIAS.varname), String
+ .valueOf(alias));
- // Create an environment variable that uniquely identifies this script operator
- String idEnvVarName = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVESCRIPTIDENVVAR);
- String idEnvVarVal = this.getOperatorId();
+ // Create an environment variable that uniquely identifies this script
+ // operator
+ String idEnvVarName = HiveConf.getVar(hconf,
+ HiveConf.ConfVars.HIVESCRIPTIDENVVAR);
+ String idEnvVarVal = getOperatorId();
env.put(safeEnvVarName(idEnvVarName), idEnvVarVal);
- scriptPid = pb.start(); // Runtime.getRuntime().exec(wrappedCmdArgs);
+ scriptPid = pb.start(); // Runtime.getRuntime().exec(wrappedCmdArgs);
- DataOutputStream scriptOut = new DataOutputStream(new BufferedOutputStream(scriptPid.getOutputStream()));
- DataInputStream scriptIn = new DataInputStream(new BufferedInputStream(scriptPid.getInputStream()));
- DataInputStream scriptErr = new DataInputStream(new BufferedInputStream(scriptPid.getErrorStream()));
+ DataOutputStream scriptOut = new DataOutputStream(
+ new BufferedOutputStream(scriptPid.getOutputStream()));
+ DataInputStream scriptIn = new DataInputStream(new BufferedInputStream(
+ scriptPid.getInputStream()));
+ DataInputStream scriptErr = new DataInputStream(
+ new BufferedInputStream(scriptPid.getErrorStream()));
scriptOutWriter = conf.getInRecordWriterClass().newInstance();
scriptOutWriter.initialize(scriptOut, hconf);
- RecordReader scriptOutputReader = conf.getOutRecordReaderClass().newInstance();
- scriptOutputReader.initialize(scriptIn, hconf, conf.getScriptOutputInfo().getProperties());
-
- outThread = new StreamThread(scriptOutputReader, new OutputStreamProcessor(
- scriptOutputDeserializer.getObjectInspector()), "OutputProcessor");
-
- RecordReader scriptErrReader = conf.getOutRecordReaderClass().newInstance();
- scriptErrReader.initialize(scriptErr, hconf, conf.getScriptOutputInfo().getProperties());
-
- errThread = new StreamThread(scriptErrReader,
- new ErrorStreamProcessor
- (HiveConf.getIntVar(hconf, HiveConf.ConfVars.SCRIPTERRORLIMIT)),
- "ErrorProcessor");
-
- if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVESCRIPTAUTOPROGRESS)) {
- autoProgressor = new AutoProgressor(this.getClass().getName(), reporter,
- Utilities.getDefaultNotificationInterval(hconf));
+ RecordReader scriptOutputReader = conf.getOutRecordReaderClass()
+ .newInstance();
+ scriptOutputReader.initialize(scriptIn, hconf, conf
+ .getScriptOutputInfo().getProperties());
+
+ outThread = new StreamThread(scriptOutputReader,
+ new OutputStreamProcessor(scriptOutputDeserializer
+ .getObjectInspector()), "OutputProcessor");
+
+ RecordReader scriptErrReader = conf.getOutRecordReaderClass()
+ .newInstance();
+ scriptErrReader.initialize(scriptErr, hconf, conf.getScriptOutputInfo()
+ .getProperties());
+
+ errThread = new StreamThread(scriptErrReader, new ErrorStreamProcessor(
+ HiveConf.getIntVar(hconf, HiveConf.ConfVars.SCRIPTERRORLIMIT)),
+ "ErrorProcessor");
+
+ if (HiveConf
+ .getBoolVar(hconf, HiveConf.ConfVars.HIVESCRIPTAUTOPROGRESS)) {
+ autoProgressor = new AutoProgressor(this.getClass().getName(),
+ reporter, Utilities.getDefaultNotificationInterval(hconf));
autoProgressor.go();
}
outThread.start();
errThread.start();
} catch (Exception e) {
- throw new HiveException ("Cannot initialize ScriptOperator", e);
+ throw new HiveException("Cannot initialize ScriptOperator", e);
}
}
- if(scriptError != null) {
+ if (scriptError != null) {
throw new HiveException(scriptError);
}
try {
- Writable res = scriptInputSerializer.serialize(row, inputObjInspectors[tag]);
+ Writable res = scriptInputSerializer.serialize(row,
+ inputObjInspectors[tag]);
scriptOutWriter.write(res);
} catch (SerDeException e) {
LOG.error("Error in serializing the row: " + e.getMessage());
@@ -305,12 +326,13 @@
serialize_error_count.set(serialize_error_count.get() + 1);
throw new HiveException(e);
} catch (IOException e) {
- if(isBrokenPipeException(e) && allowPartialConsumption()) {
+ if (isBrokenPipeException(e) && allowPartialConsumption()) {
setDone(true);
- LOG.warn("Got broken pipe during write: ignoring exception and setting operator to done");
+ LOG
+ .warn("Got broken pipe during write: ignoring exception and setting operator to done");
} else {
LOG.error("Error in writing to script: " + e.getMessage());
- if(isBrokenPipeException(e)) {
+ if (isBrokenPipeException(e)) {
displayBrokenPipeInfo();
}
scriptError = e;
@@ -319,40 +341,45 @@
}
}
+ @Override
public void close(boolean abort) throws HiveException {
boolean new_abort = abort;
- if(!abort) {
- if(scriptError != null) {
+ if (!abort) {
+ if (scriptError != null) {
throw new HiveException(scriptError);
}
// everything ok. try normal shutdown
try {
try {
- if (scriptOutWriter != null)
+ if (scriptOutWriter != null) {
scriptOutWriter.close();
+ }
} catch (IOException e) {
- if(isBrokenPipeException(e) && allowPartialConsumption()) {
+ if (isBrokenPipeException(e) && allowPartialConsumption()) {
LOG.warn("Got broken pipe: ignoring exception");
} else {
- if(isBrokenPipeException(e)) {
+ if (isBrokenPipeException(e)) {
displayBrokenPipeInfo();
}
throw e;
}
}
int exitVal = 0;
- if (scriptPid != null)
+ if (scriptPid != null) {
exitVal = scriptPid.waitFor();
+ }
if (exitVal != 0) {
LOG.error("Script failed with code " + exitVal);
new_abort = true;
- };
+ }
+ ;
} catch (IOException e) {
LOG.error("Got ioexception: " + e.getMessage());
e.printStackTrace();
new_abort = true;
- } catch (InterruptedException e) { }
+ } catch (InterruptedException e) {
+ }
} else {
@@ -362,16 +389,17 @@
// Interrupt the current thread after 1 second
final Thread mythread = Thread.currentThread();
Timer timer = new Timer(true);
- timer.schedule(new TimerTask(){
+ timer.schedule(new TimerTask() {
@Override
public void run() {
mythread.interrupt();
- }},
- 1000);
+ }
+ }, 1000);
// Wait for the child process to finish
int exitVal = 0;
- if (scriptPid != null)
+ if (scriptPid != null) {
scriptPid.waitFor();
+ }
// Cancel the timer
timer.cancel();
// Output the exit code
@@ -384,74 +412,82 @@
// try these best effort
try {
- if (outThread != null)
+ if (outThread != null) {
outThread.join(0);
+ }
} catch (Exception e) {
- LOG.warn("Exception in closing outThread: " + StringUtils.stringifyException(e));
+ LOG.warn("Exception in closing outThread: "
+ + StringUtils.stringifyException(e));
}
try {
- if (errThread != null)
+ if (errThread != null) {
errThread.join(0);
+ }
} catch (Exception e) {
- LOG.warn("Exception in closing errThread: " + StringUtils.stringifyException(e));
+ LOG.warn("Exception in closing errThread: "
+ + StringUtils.stringifyException(e));
}
try {
- if (scriptPid != null)
+ if (scriptPid != null) {
scriptPid.destroy();
+ }
} catch (Exception e) {
- LOG.warn("Exception in destroying scriptPid: " + StringUtils.stringifyException(e));
+ LOG.warn("Exception in destroying scriptPid: "
+ + StringUtils.stringifyException(e));
}
super.close(new_abort);
- if(new_abort && !abort) {
- throw new HiveException ("Hit error while closing ..");
+ if (new_abort && !abort) {
+ throw new HiveException("Hit error while closing ..");
}
}
-
interface StreamProcessor {
public void processLine(Writable line) throws HiveException;
+
public void close() throws HiveException;
}
-
class OutputStreamProcessor implements StreamProcessor {
Object row;
ObjectInspector rowInspector;
+
public OutputStreamProcessor(ObjectInspector rowInspector) {
this.rowInspector = rowInspector;
}
+
public void processLine(Writable line) throws HiveException {
try {
row = scriptOutputDeserializer.deserialize(line);
} catch (SerDeException e) {
- deserialize_error_count.set(deserialize_error_count.get()+1);
+ deserialize_error_count.set(deserialize_error_count.get() + 1);
return;
}
forward(row, rowInspector);
}
+
public void close() {
}
}
/**
* The processor for stderr stream.
- *
- * TODO: In the future when we move to hadoop 0.18 and above, we should borrow the logic
- * from HadoopStreaming: PipeMapRed.java MRErrorThread to support counters and status
- * updates.
+ *
+ * TODO: In the future when we move to hadoop 0.18 and above, we should borrow
+ * the logic from HadoopStreaming: PipeMapRed.java MRErrorThread to support
+ * counters and status updates.
*/
class ErrorStreamProcessor implements StreamProcessor {
private long bytesCopied = 0;
- private long maxBytes;
+ private final long maxBytes;
private long lastReportTime;
- public ErrorStreamProcessor (int maxBytes) {
- this.maxBytes = (long)maxBytes;
+ public ErrorStreamProcessor(int maxBytes) {
+ this.maxBytes = maxBytes;
lastReportTime = 0;
}
@@ -460,12 +496,14 @@
String stringLine = line.toString();
int len = 0;
- if (line instanceof Text)
- len = ((Text)line).getLength();
- else if (line instanceof BytesWritable)
- len = ((BytesWritable)line).getSize();
+ if (line instanceof Text) {
+ len = ((Text) line).getLength();
+ } else if (line instanceof BytesWritable) {
+ len = ((BytesWritable) line).getSize();
+ }
- // Report progress for each stderr line, but no more frequently than once per minute.
+ // Report progress for each stderr line, but no more frequently than once
+ // per minute.
long now = System.currentTimeMillis();
// reporter is a member variable of the Operator class.
if (now - lastReportTime > 60 * 1000 && reporter != null) {
@@ -474,21 +512,22 @@
reporter.progress();
}
- if((maxBytes < 0) || (bytesCopied < maxBytes)) {
+ if ((maxBytes < 0) || (bytesCopied < maxBytes)) {
System.err.println(stringLine);
}
if (bytesCopied < maxBytes && bytesCopied + len >= maxBytes) {
System.err.println("Operator " + id + " " + getName()
- + ": exceeding stderr limit of " + maxBytes + " bytes, will truncate stderr messages.");
+ + ": exceeding stderr limit of " + maxBytes
+ + " bytes, will truncate stderr messages.");
}
bytesCopied += len;
}
+
public void close() {
}
}
-
class StreamThread extends Thread {
RecordReader in;
@@ -502,19 +541,20 @@
setDaemon(true);
}
+ @Override
public void run() {
try {
Writable row = in.createRow();
- while(true) {
+ while (true) {
long bytes = in.next(row);
- if(bytes <= 0) {
+ if (bytes <= 0) {
break;
}
proc.processLine(row);
}
- LOG.info("StreamThread "+name+" done");
+ LOG.info("StreamThread " + name + " done");
} catch (Throwable th) {
scriptError = th;
@@ -534,27 +574,26 @@
}
/**
- * Wrap the script in a wrapper that allows admins to control
+ * Wrap the script in a wrapper that allows admins to control
**/
- protected String [] addWrapper(String [] inArgs) {
+ protected String[] addWrapper(String[] inArgs) {
String wrapper = HiveConf.getVar(hconf, HiveConf.ConfVars.SCRIPTWRAPPER);
- if(wrapper == null) {
+ if (wrapper == null) {
return inArgs;
}
- String [] wrapComponents = splitArgs(wrapper);
+ String[] wrapComponents = splitArgs(wrapper);
int totallength = wrapComponents.length + inArgs.length;
- String [] finalArgv = new String [totallength];
- for(int i=0; i<wrapComponents.length; i++) {
+ String[] finalArgv = new String[totallength];
+ for (int i = 0; i < wrapComponents.length; i++) {
finalArgv[i] = wrapComponents[i];
}
- for(int i=0; i < inArgs.length; i++) {
- finalArgv[wrapComponents.length+i] = inArgs[i];
+ for (int i = 0; i < inArgs.length; i++) {
+ finalArgv[wrapComponents.length + i] = inArgs[i];
}
return (finalArgv);
}
-
// Code below shameless borrowed from Hadoop Streaming
public static String[] splitArgs(String args) {
@@ -612,6 +651,7 @@
return "SCR";
}
+ @Override
public int getType() {
return OperatorType.SCRIPT;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Thu Jan 21 10:37:58 2010
@@ -31,44 +31,47 @@
/**
* Select operator implementation
**/
-public class SelectOperator extends Operator <selectDesc> implements Serializable {
+public class SelectOperator extends Operator<selectDesc> implements
+ Serializable {
private static final long serialVersionUID = 1L;
transient protected ExprNodeEvaluator[] eval;
transient Object[] output;
-
+
+ @Override
protected void initializeOp(Configuration hconf) throws HiveException {
// Just forward the row as is
if (conf.isSelStarNoCompute()) {
- initializeChildren(hconf);
+ initializeChildren(hconf);
return;
}
-
+
ArrayList<exprNodeDesc> colList = conf.getColList();
eval = new ExprNodeEvaluator[colList.size()];
- for(int i=0; i<colList.size(); i++) {
- assert(colList.get(i) != null);
+ for (int i = 0; i < colList.size(); i++) {
+ assert (colList.get(i) != null);
eval[i] = ExprNodeEvaluatorFactory.get(colList.get(i));
}
-
+
output = new Object[eval.length];
- LOG.info("SELECT " + ((StructObjectInspector)inputObjInspectors[0]).getTypeName());
+ LOG.info("SELECT "
+ + ((StructObjectInspector) inputObjInspectors[0]).getTypeName());
outputObjInspector = initEvaluatorsAndReturnStruct(eval, conf
- .getOutputColumnNames(), inputObjInspectors[0]);
+ .getOutputColumnNames(), inputObjInspectors[0]);
initializeChildren(hconf);
}
- public void processOp(Object row, int tag)
- throws HiveException {
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
// Just forward the row as is
if (conf.isSelStarNoCompute()) {
forward(row, inputObjInspectors[tag]);
return;
}
-
- for(int i=0; i<eval.length; i++) {
+
+ for (int i = 0; i < eval.length; i++) {
try {
output[i] = eval[i].evaluate(row);
} catch (HiveException e) {
@@ -88,7 +91,8 @@
public String getName() {
return new String("SEL");
}
-
+
+ @Override
public int getType() {
return OperatorType.SELECT;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SkewJoinHandler.java Thu Jan 21 10:37:58 2010
@@ -66,40 +66,40 @@
* Right now, we use one file per skew key.
*
* <p>
- * For more info, please see
- * https://issues.apache.org/jira/browse/HIVE-964.
+ * For more info, please see https://issues.apache.org/jira/browse/HIVE-964.
*
*/
public class SkewJoinHandler {
-
- static final protected Log LOG = LogFactory.getLog(SkewJoinHandler.class.getName());
-
+
+ static final protected Log LOG = LogFactory.getLog(SkewJoinHandler.class
+ .getName());
+
public int currBigKeyTag = -1;
-
+
private int rowNumber = 0;
private int currTag = -1;
-
+
private int skewKeyDefinition = -1;
- private Map<Byte,StructObjectInspector> skewKeysTableObjectInspector = null;
- private Map<Byte,SerDe> tblSerializers = null;
+ private Map<Byte, StructObjectInspector> skewKeysTableObjectInspector = null;
+ private Map<Byte, SerDe> tblSerializers = null;
private Map<Byte, tableDesc> tblDesc = null;
-
+
private Map<Byte, Boolean> bigKeysExistingMap = null;
-
+
Configuration hconf = null;
List<Object> dummyKey = null;
String taskId;
-
- private CommonJoinOperator<? extends Serializable> joinOp;
- private int numAliases;
- private joinDesc conf;
-
+
+ private final CommonJoinOperator<? extends Serializable> joinOp;
+ private final int numAliases;
+ private final joinDesc conf;
+
public SkewJoinHandler(CommonJoinOperator<? extends Serializable> joinOp) {
this.joinOp = joinOp;
- this.numAliases = joinOp.numAliases;
- this.conf = joinOp.getConf();
+ numAliases = joinOp.numAliases;
+ conf = joinOp.getConf();
}
-
+
public void initiliaze(Configuration hconf) {
this.hconf = hconf;
joinDesc desc = joinOp.getConf();
@@ -114,7 +114,7 @@
for (int i = 0; i < numAliases; i++) {
Byte alias = conf.getTagOrder()[i];
List<ObjectInspector> skewTableKeyInspectors = new ArrayList<ObjectInspector>();
- StructObjectInspector soi = (StructObjectInspector) this.joinOp.inputObjInspectors[alias];
+ StructObjectInspector soi = (StructObjectInspector) joinOp.inputObjInspectors[alias];
StructField sf = soi.getStructFieldRef(Utilities.ReduceField.KEY
.toString());
List<? extends StructField> keyFields = ((StructObjectInspector) sf
@@ -124,7 +124,8 @@
skewTableKeyInspectors.add(keyFields.get(k).getFieldObjectInspector());
}
tableDesc joinKeyDesc = desc.getKeyTableDesc();
- List<String> keyColNames = Utilities.getColumnNames(joinKeyDesc.getProperties());
+ List<String> keyColNames = Utilities.getColumnNames(joinKeyDesc
+ .getProperties());
StructObjectInspector structTblKeyInpector = ObjectInspectorFactory
.getStandardStructObjectInspector(keyColNames, skewTableKeyInspectors);
@@ -135,21 +136,23 @@
tblSerializers.put((byte) i, serializer);
} catch (SerDeException e) {
LOG.error("Skewjoin will be disabled due to " + e.getMessage(), e);
- this.joinOp.handleSkewJoin = false;
+ joinOp.handleSkewJoin = false;
break;
}
- tableDesc valTblDesc = this.joinOp.getSpillTableDesc(alias);
+ tableDesc valTblDesc = joinOp.getSpillTableDesc(alias);
List<String> valColNames = new ArrayList<String>();
- if (valTblDesc != null)
+ if (valTblDesc != null) {
valColNames = Utilities.getColumnNames(valTblDesc.getProperties());
+ }
StructObjectInspector structTblValInpector = ObjectInspectorFactory
.getStandardStructObjectInspector(valColNames,
- this.joinOp.joinValuesStandardObjectInspectors.get((byte) i));
+ joinOp.joinValuesStandardObjectInspectors.get((byte) i));
StructObjectInspector structTblInpector = ObjectInspectorFactory
- .getUnionStructObjectInspector(Arrays.asList(new StructObjectInspector[] {
- structTblValInpector,structTblKeyInpector }));
+ .getUnionStructObjectInspector(Arrays
+ .asList(new StructObjectInspector[] { structTblValInpector,
+ structTblKeyInpector }));
skewKeysTableObjectInspector.put((byte) i, structTblInpector);
}
@@ -165,50 +168,57 @@
}
}
}
-
+
void endGroup() throws IOException, HiveException {
- if(skewKeyInCurrentGroup) {
-
- String specPath = conf.getBigKeysDirMap().get((byte)currBigKeyTag);
- RowContainer<ArrayList<Object>> bigKey = joinOp.storage.get(Byte.valueOf((byte)currBigKeyTag));
- Path outputPath = getOperatorOutputPath(specPath);
+ if (skewKeyInCurrentGroup) {
+
+ String specPath = conf.getBigKeysDirMap().get((byte) currBigKeyTag);
+ RowContainer<ArrayList<Object>> bigKey = joinOp.storage.get(Byte
+ .valueOf((byte) currBigKeyTag));
+ Path outputPath = getOperatorOutputPath(specPath);
FileSystem destFs = outputPath.getFileSystem(hconf);
bigKey.copyToDFSDirecory(destFs, outputPath);
-
+
for (int i = 0; i < numAliases; i++) {
- if (((byte)i) == currBigKeyTag) continue;
- RowContainer<ArrayList<Object>> values = joinOp.storage.get(Byte.valueOf((byte)i));
- if(values != null) {
- specPath = conf.getSmallKeysDirMap().get((byte)currBigKeyTag).get((byte)i);
+ if (((byte) i) == currBigKeyTag) {
+ continue;
+ }
+ RowContainer<ArrayList<Object>> values = joinOp.storage.get(Byte
+ .valueOf((byte) i));
+ if (values != null) {
+ specPath = conf.getSmallKeysDirMap().get((byte) currBigKeyTag).get(
+ (byte) i);
values.copyToDFSDirecory(destFs, getOperatorOutputPath(specPath));
}
}
}
skewKeyInCurrentGroup = false;
}
-
+
boolean skewKeyInCurrentGroup = false;
+
public void handleSkew(int tag) throws HiveException {
- if(joinOp.newGroupStarted || tag != currTag) {
+ if (joinOp.newGroupStarted || tag != currTag) {
rowNumber = 0;
currTag = tag;
}
-
- if(joinOp.newGroupStarted) {
+
+ if (joinOp.newGroupStarted) {
currBigKeyTag = -1;
joinOp.newGroupStarted = false;
- dummyKey = (List<Object>)joinOp.getGroupKeyObject();
+ dummyKey = (List<Object>) joinOp.getGroupKeyObject();
skewKeyInCurrentGroup = false;
-
+
for (int i = 0; i < numAliases; i++) {
- RowContainer<ArrayList<Object>> rc = joinOp.storage.get(Byte.valueOf((byte)i));
- if(rc != null) {
+ RowContainer<ArrayList<Object>> rc = joinOp.storage.get(Byte
+ .valueOf((byte) i));
+ if (rc != null) {
rc.setKeyObject(dummyKey);
}
}
}
-
+
rowNumber++;
if (currBigKeyTag == -1 && (tag < numAliases - 1)
&& rowNumber >= skewKeyDefinition) {
@@ -216,14 +226,15 @@
// table (the last table can always be streamed), we define that we get
// a skew key now.
currBigKeyTag = tag;
-
+
// right now we assume that the group by is an ArrayList object. It may
// change in future.
- if(! (dummyKey instanceof List))
+ if (!(dummyKey instanceof List)) {
throw new RuntimeException("Bug in handle skew key in a seperate job.");
-
+ }
+
skewKeyInCurrentGroup = true;
- bigKeysExistingMap.put(Byte.valueOf((byte)currBigKeyTag), Boolean.TRUE);
+ bigKeysExistingMap.put(Byte.valueOf((byte) currBigKeyTag), Boolean.TRUE);
}
}
@@ -240,8 +251,9 @@
// if we did not see a skew key in this table, continue to next
// table
- if (!bigKeysExistingMap.get((byte) bigKeyTbl))
+ if (!bigKeysExistingMap.get((byte) bigKeyTbl)) {
continue;
+ }
try {
String specPath = conf.getBigKeysDirMap().get((byte) bigKeyTbl);
@@ -249,8 +261,9 @@
FileSystem fs = bigKeyPath.getFileSystem(hconf);
delete(bigKeyPath, fs);
for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) {
- if (((byte) smallKeyTbl) == bigKeyTbl)
+ if (((byte) smallKeyTbl) == bigKeyTbl) {
continue;
+ }
specPath = conf.getSmallKeysDirMap().get((byte) bigKeyTbl).get(
(byte) smallKeyTbl);
delete(getOperatorOutputPath(specPath), fs);
@@ -272,26 +285,30 @@
private void commit() throws IOException {
for (int bigKeyTbl = 0; bigKeyTbl < numAliases; bigKeyTbl++) {
-
+
// if we did not see a skew key in this table, continue to next table
// we are trying to avoid an extra call of FileSystem.exists()
- Boolean existing = bigKeysExistingMap.get(Byte.valueOf((byte)bigKeyTbl));
- if (existing == null || !existing)
+ Boolean existing = bigKeysExistingMap.get(Byte.valueOf((byte) bigKeyTbl));
+ if (existing == null || !existing) {
continue;
-
- String specPath = conf.getBigKeysDirMap().get(Byte.valueOf((byte) bigKeyTbl));
+ }
+
+ String specPath = conf.getBigKeysDirMap().get(
+ Byte.valueOf((byte) bigKeyTbl));
commitOutputPathToFinalPath(specPath, false);
for (int smallKeyTbl = 0; smallKeyTbl < numAliases; smallKeyTbl++) {
- if ( smallKeyTbl == bigKeyTbl)
+ if (smallKeyTbl == bigKeyTbl) {
continue;
- specPath = conf.getSmallKeysDirMap().get(Byte.valueOf((byte) bigKeyTbl)).get(
- Byte.valueOf((byte) smallKeyTbl));
+ }
+ specPath = conf.getSmallKeysDirMap()
+ .get(Byte.valueOf((byte) bigKeyTbl)).get(
+ Byte.valueOf((byte) smallKeyTbl));
// the file may not exist, and we just ignore this
commitOutputPathToFinalPath(specPath, true);
}
}
}
-
+
private void commitOutputPathToFinalPath(String specPath,
boolean ignoreNonExisting) throws IOException {
Path outPath = getOperatorOutputPath(specPath);
@@ -304,23 +321,25 @@
throw new IOException("Unable to rename output to: " + finalPath);
}
} catch (FileNotFoundException e) {
- if (!ignoreNonExisting)
+ if (!ignoreNonExisting) {
throw e;
+ }
} catch (IOException e) {
- if (!fs.exists(outPath) && ignoreNonExisting)
+ if (!fs.exists(outPath) && ignoreNonExisting) {
return;
+ }
throw e;
}
}
-
+
private Path getOperatorOutputPath(String specPath) throws IOException {
Path tmpPath = Utilities.toTempPath(specPath);
return new Path(tmpPath, Utilities.toTempPath(taskId));
}
-
+
private Path getOperatorFinalPath(String specPath) throws IOException {
Path tmpPath = Utilities.toTempPath(specPath);
return new Path(tmpPath, taskId);
}
-
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Thu Jan 21 10:37:58 2010
@@ -25,32 +25,37 @@
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
/**
- * Table Scan Operator
- * If the data is coming from the map-reduce framework, just forward it.
- * This will be needed as part of local work when data is not being read as part of map-reduce framework
+ * Table Scan Operator If the data is coming from the map-reduce framework, just
+ * forward it. This will be needed as part of local work when data is not being
+ * read as part of map-reduce framework
**/
-public class TableScanOperator extends Operator<tableScanDesc> implements Serializable {
+public class TableScanOperator extends Operator<tableScanDesc> implements
+ Serializable {
private static final long serialVersionUID = 1L;
/**
- * Currently, the table scan operator does not do anything special other than just forwarding the row. Since the
- * table data is always read as part of the map-reduce framework by the mapper. But, this assumption is not true,
- * i.e table data is not only read by the mapper, this operator will be enhanced to read the table.
+ * Currently, the table scan operator does not do anything special other than
+ * just forwarding the row. Since the table data is always read as part of the
+ * map-reduce framework by the mapper. But, this assumption is not true, i.e
+ * table data is not only read by the mapper, this operator will be enhanced
+ * to read the table.
**/
@Override
- public void processOp(Object row, int tag)
- throws HiveException {
- forward(row, inputObjInspectors[tag]);
+ public void processOp(Object row, int tag) throws HiveException {
+ forward(row, inputObjInspectors[tag]);
}
/**
- * The operator name for this operator type. This is used to construct the rule for an operator
+ * The operator name for this operator type. This is used to construct the
+ * rule for an operator
+ *
* @return the operator name
**/
+ @Override
public String getName() {
return new String("TS");
}
-
+
// this 'neededColumnIDs' field is included in this operator class instead of
// its desc class.The reason is that 1)tableScanDesc can not be instantiated,
// and 2) it will fail some join and union queries if this is added forcibly
@@ -58,13 +63,14 @@
java.util.ArrayList<Integer> neededColumnIDs;
public void setNeededColumnIDs(java.util.ArrayList<Integer> orign_columns) {
- this.neededColumnIDs = orign_columns;
+ neededColumnIDs = orign_columns;
}
public java.util.ArrayList<Integer> getNeededColumnIDs() {
return neededColumnIDs;
}
+ @Override
public int getType() {
return OperatorType.TABLESCAN;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Thu Jan 21 10:37:58 2010
@@ -18,9 +18,16 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.*;
-import java.util.*;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -31,15 +38,12 @@
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.util.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-
/**
* Task implementation
**/
-public abstract class Task <T extends Serializable> implements Serializable, Node {
+public abstract class Task<T extends Serializable> implements Serializable,
+ Node {
private static final long serialVersionUID = 1L;
transient protected boolean started;
@@ -68,7 +72,8 @@
this.taskCounters = new HashMap<String, Long>();
}
- public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
+ public void initialize(HiveConf conf, QueryPlan queryPlan,
+ DriverContext driverContext) {
this.queryPlan = queryPlan;
isdone = false;
started = false;
@@ -76,21 +81,22 @@
this.conf = conf;
try {
- db = Hive.get(conf);
+ db = Hive.get(conf);
} catch (HiveException e) {
// Bail out ungracefully - we should never hit
// this here - but would have hit it in SemanticAnalyzer
LOG.error(StringUtils.stringifyException(e));
- throw new RuntimeException (e);
+ throw new RuntimeException(e);
}
this.driverContext = driverContext;
-
+
console = new LogHelper(LOG);
}
/**
- * This method is called in the Driver on every task. It updates counters
- * and calls execute(), which is overridden in each task
+ * This method is called in the Driver on every task. It updates counters and
+ * calls execute(), which is overridden in each task
+ *
* @return return value of execute()
*/
public int executeTask() {
@@ -112,32 +118,35 @@
}
/**
- * This method is overridden in each Task.
- * TODO execute should return a TaskHandle.
+ * This method is overridden in each Task. TODO execute should return a
+ * TaskHandle.
+ *
* @return status of executing the task
*/
protected abstract int execute();
-
+
/**
- * Update the progress of the task within taskHandle and also
- * dump the progress information to the history file
- * @param taskHandle task handle returned by execute
- * @throws IOException
+ * Update the progress of the task within taskHandle and also dump the
+ * progress information to the history file
+ *
+ * @param taskHandle
+ * task handle returned by execute
+ * @throws IOException
*/
public void progress(TaskHandle taskHandle) throws IOException {
// do nothing by default
}
-
+
// dummy method - FetchTask overwrites this
- public boolean fetch(Vector<String> res) throws IOException {
+ public boolean fetch(Vector<String> res) throws IOException {
assert false;
- return false;
+ return false;
}
public void setChildTasks(List<Task<? extends Serializable>> childTasks) {
this.childTasks = childTasks;
}
-
+
public List<? extends Node> getChildren() {
return getChildTasks();
}
@@ -155,7 +164,9 @@
}
/**
- * Add a dependent task on the current task. Return if the dependency already existed or is this a new one
+ * Add a dependent task on the current task. Return if the dependency already
+ * existed or is this a new one
+ *
* @return true if the task got added false if it already existed
*/
public boolean addDependentTask(Task<? extends Serializable> dependent) {
@@ -178,13 +189,17 @@
/**
* remove the dependent task
- * @param dependent the task to remove
+ *
+ * @param dependent
+ * the task to remove
*/
public void removeDependentTask(Task<? extends Serializable> dependent) {
if ((getChildTasks() != null) && (getChildTasks().contains(dependent))) {
getChildTasks().remove(dependent);
- if ((dependent.getParentTasks() != null) && (dependent.getParentTasks().contains(this)))
+ if ((dependent.getParentTasks() != null)
+ && (dependent.getParentTasks().contains(this))) {
dependent.getParentTasks().remove(this);
+ }
}
}
@@ -223,7 +238,7 @@
public boolean isRunnable() {
boolean isrunnable = true;
if (parentTasks != null) {
- for(Task<? extends Serializable> parent: parentTasks) {
+ for (Task<? extends Serializable> parent : parentTasks) {
if (!parent.done()) {
isrunnable = false;
break;
@@ -269,8 +284,8 @@
}
/**
- * Should be overridden to return the type of the specific task among
- * the types in TaskType
+ * Should be overridden to return the type of the specific task among the
+ * types in TaskType
*
* @return TaskTypeType.* or -1 if not overridden
*/
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Thu Jan 21 10:37:58 2010
@@ -18,11 +18,19 @@
package org.apache.hadoop.hive.ql.exec;
-import java.util.*;
-import java.io.*;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
-import org.apache.hadoop.hive.ql.plan.*;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.FunctionWork;
+import org.apache.hadoop.hive.ql.plan.copyWork;
+import org.apache.hadoop.hive.ql.plan.explainWork;
+import org.apache.hadoop.hive.ql.plan.fetchWork;
+import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.moveWork;
/**
* TaskFactory implementation
@@ -46,38 +54,43 @@
taskvec.add(new taskTuple<fetchWork>(fetchWork.class, FetchTask.class));
taskvec.add(new taskTuple<copyWork>(copyWork.class, CopyTask.class));
taskvec.add(new taskTuple<DDLWork>(DDLWork.class, DDLTask.class));
- taskvec.add(new taskTuple<FunctionWork>(FunctionWork.class, FunctionTask.class));
- taskvec.add(new taskTuple<explainWork>(explainWork.class, ExplainTask.class));
- taskvec.add(new taskTuple<ConditionalWork>(ConditionalWork.class, ConditionalTask.class));
+ taskvec.add(new taskTuple<FunctionWork>(FunctionWork.class,
+ FunctionTask.class));
+ taskvec
+ .add(new taskTuple<explainWork>(explainWork.class, ExplainTask.class));
+ taskvec.add(new taskTuple<ConditionalWork>(ConditionalWork.class,
+ ConditionalTask.class));
// we are taking this out to allow us to instantiate either MapRedTask or
// ExecDriver dynamically at run time based on configuration
- // taskvec.add(new taskTuple<mapredWork>(mapredWork.class, ExecDriver.class));
+ // taskvec.add(new taskTuple<mapredWork>(mapredWork.class,
+ // ExecDriver.class));
}
- private static ThreadLocal<Integer> tid = new ThreadLocal<Integer> () {
+ private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {
+ @Override
protected synchronized Integer initialValue() {
- return new Integer(0);
- }
+ return new Integer(0);
+ }
};
public static int getAndIncrementId() {
int curValue = tid.get().intValue();
- tid.set(new Integer(curValue+1));
+ tid.set(new Integer(curValue + 1));
return curValue;
}
-
public static void resetId() {
tid.set(new Integer(0));
}
-
+
@SuppressWarnings("unchecked")
- public static <T extends Serializable> Task<T> get(Class<T> workClass, HiveConf conf) {
-
- for(taskTuple<? extends Serializable> t: taskvec) {
- if(t.workClass == workClass) {
+ public static <T extends Serializable> Task<T> get(Class<T> workClass,
+ HiveConf conf) {
+
+ for (taskTuple<? extends Serializable> t : taskvec) {
+ if (t.workClass == workClass) {
try {
- Task<T> ret = (Task<T>)t.taskClass.newInstance();
+ Task<T> ret = (Task<T>) t.taskClass.newInstance();
ret.setId("Stage-" + Integer.toString(getAndIncrementId()));
return ret;
} catch (Exception e) {
@@ -85,57 +98,58 @@
}
}
}
-
- if(workClass == mapredWork.class) {
+
+ if (workClass == mapredWork.class) {
boolean viachild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
-
+
try {
// in local mode - or if otherwise so configured - always submit
// jobs via separate jvm
Task<T> ret = null;
- if(conf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local") || viachild) {
- ret = (Task<T>)MapRedTask.class.newInstance();
+ if (conf.getVar(HiveConf.ConfVars.HADOOPJT).equals("local") || viachild) {
+ ret = (Task<T>) MapRedTask.class.newInstance();
} else {
- ret = (Task<T>)ExecDriver.class.newInstance();
+ ret = (Task<T>) ExecDriver.class.newInstance();
}
ret.setId("Stage-" + Integer.toString(getAndIncrementId()));
return ret;
} catch (Exception e) {
- throw new RuntimeException (e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
}
}
- throw new RuntimeException ("No task for work class " + workClass.getName());
+ throw new RuntimeException("No task for work class " + workClass.getName());
}
public static <T extends Serializable> Task<T> get(T work, HiveConf conf,
- Task<? extends Serializable> ... tasklist) {
- Task<T> ret = get((Class <T>)work.getClass(), conf);
+ Task<? extends Serializable>... tasklist) {
+ Task<T> ret = get((Class<T>) work.getClass(), conf);
ret.setWork(work);
- if(tasklist.length == 0)
+ if (tasklist.length == 0) {
return (ret);
+ }
- ArrayList<Task<? extends Serializable>> clist = new ArrayList<Task<? extends Serializable>> ();
- for(Task<? extends Serializable> tsk: tasklist) {
+ ArrayList<Task<? extends Serializable>> clist = new ArrayList<Task<? extends Serializable>>();
+ for (Task<? extends Serializable> tsk : tasklist) {
clist.add(tsk);
}
ret.setChildTasks(clist);
return (ret);
}
- public static <T extends Serializable> Task<T> getAndMakeChild(
- T work, HiveConf conf,
- Task<? extends Serializable> ... tasklist) {
- Task<T> ret = get((Class <T>)work.getClass(), conf);
+ public static <T extends Serializable> Task<T> getAndMakeChild(T work,
+ HiveConf conf, Task<? extends Serializable>... tasklist) {
+ Task<T> ret = get((Class<T>) work.getClass(), conf);
ret.setWork(work);
- if(tasklist.length == 0)
+ if (tasklist.length == 0) {
return (ret);
+ }
// Add the new task as child of each of the passed in tasks
- for(Task<? extends Serializable> tsk: tasklist) {
+ for (Task<? extends Serializable> tsk : tasklist) {
List<Task<? extends Serializable>> children = tsk.getChildTasks();
if (children == null) {
children = new ArrayList<Task<? extends Serializable>>();
@@ -143,7 +157,7 @@
children.add(ret);
tsk.setChildTasks(children);
}
-
+
return (ret);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskHandle.java Thu Jan 21 10:37:58 2010
@@ -5,10 +5,13 @@
import org.apache.hadoop.mapred.Counters;
public class TaskHandle {
- // The eventual goal is to monitor the progress of all the tasks, not only the map reduce task.
- // The execute() method of the tasks will return immediately, and return a task specific handle to
- // monitor the progress of that task.
- // Right now, the behavior is kind of broken, ExecDriver's execute method calls progress - instead it should
+ // The eventual goal is to monitor the progress of all the tasks, not only the
+ // map reduce task.
+ // The execute() method of the tasks will return immediately, and return a
+ // task specific handle to
+ // monitor the progress of that task.
+ // Right now, the behavior is kind of broken, ExecDriver's execute method
+ // calls progress - instead it should
// be invoked by Driver
public Counters getCounters() throws IOException {
// default implementation
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskResult.java Thu Jan 21 10:37:58 2010
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import java.util.*;
/**
* TaskResult implementation
@@ -27,6 +26,7 @@
public class TaskResult {
protected int exitVal;
protected boolean runStatus;
+
public TaskResult() {
exitVal = -1;
setRunning(true);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java Thu Jan 21 10:37:58 2010
@@ -18,19 +18,9 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.*;
-import java.util.*;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
-import org.apache.hadoop.util.StringUtils;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import java.io.Serializable;
+import org.apache.hadoop.hive.ql.session.SessionState;
/**
* TaskRunner implementation
@@ -51,6 +41,7 @@
return tsk;
}
+ @Override
public void run() {
SessionState.start(ss);
runSequential();
@@ -65,5 +56,4 @@
result.setExitVal(exitVal);
}
-
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TerminalOperator.java Thu Jan 21 10:37:58 2010
@@ -18,13 +18,13 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.*;
+import java.io.Serializable;
/**
* Terminal Operator Base Class
**/
-public abstract class TerminalOperator <T extends Serializable> extends Operator <T>
- implements Serializable {
+public abstract class TerminalOperator<T extends Serializable> extends
+ Operator<T> implements Serializable {
private static final long serialVersionUID = 1L;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordReader.java Thu Jan 21 10:37:58 2010
@@ -22,20 +22,19 @@
import java.io.InputStream;
import java.util.Properties;
-import org.apache.hadoop.mapred.LineRecordReader.LineReader;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
public class TextRecordReader implements RecordReader {
- private LineReader lineReader;
+ private LineReader lineReader;
private InputStream in;
- private Text row;
+ private Text row;
- public void initialize(InputStream in, Configuration conf, Properties tbl) throws IOException {
+ public void initialize(InputStream in, Configuration conf, Properties tbl)
+ throws IOException {
lineReader = new LineReader(in, conf);
this.in = in;
}
@@ -46,14 +45,16 @@
}
public int next(Writable row) throws IOException {
- if (lineReader == null)
+ if (lineReader == null) {
return -1;
+ }
- return lineReader.readLine((Text)row);
+ return lineReader.readLine((Text) row);
}
public void close() throws IOException {
- if (in != null)
+ if (in != null) {
in.close();
+ }
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TextRecordWriter.java Thu Jan 21 10:37:58 2010
@@ -22,19 +22,20 @@
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
public class TextRecordWriter implements RecordWriter {
private OutputStream out;
- public void initialize(OutputStream out, Configuration conf) throws IOException {
+ public void initialize(OutputStream out, Configuration conf)
+ throws IOException {
this.out = out;
}
public void write(Writable row) throws IOException {
- Text text = (Text)row;
+ Text text = (Text) row;
out.write(text.getBytes(), 0, text.getLength());
out.write(Utilities.newLineCode);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Throttle.java Thu Jan 21 10:37:58 2010
@@ -18,21 +18,13 @@
package org.apache.hadoop.hive.ql.exec;
-import java.io.*;
-import java.util.*;
-import java.util.regex.Pattern;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.URL;
-import java.net.URLEncoder;
-import java.net.URLDecoder;
-import java.net.MalformedURLException;
-import java.net.InetSocketAddress;
+import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobTracker;
/*
* Intelligence to make clients wait if the cluster is in a bad state.
@@ -51,14 +43,14 @@
/**
* fetch http://tracker.om:/gc.jsp?threshold=period
*/
- static void checkJobTracker(JobConf conf, Log LOG) {
+ static void checkJobTracker(JobConf conf, Log LOG) {
try {
- byte buffer[] = new byte[1024];
+ byte buffer[] = new byte[1024];
int threshold = conf.getInt("mapred.throttle.threshold.percent",
- DEFAULT_MEMORY_GC_PERCENT);
+ DEFAULT_MEMORY_GC_PERCENT);
int retry = conf.getInt("mapred.throttle.retry.period",
- DEFAULT_RETRY_PERIOD);
+ DEFAULT_RETRY_PERIOD);
// If the threshold is 100 percent, then there is no throttling
if (threshold == 100) {
@@ -66,35 +58,35 @@
}
// This is the Job Tracker URL
- String tracker = JobTrackerURLResolver.getURL(conf) +
- "/gc.jsp?threshold=" + threshold;
+ String tracker = JobTrackerURLResolver.getURL(conf)
+ + "/gc.jsp?threshold=" + threshold;
while (true) {
// read in the first 1K characters from the URL
URL url = new URL(tracker);
LOG.debug("Throttle: URL " + tracker);
InputStream in = url.openStream();
- int numRead = in.read(buffer);
+ in.read(buffer);
in.close();
String fetchString = new String(buffer);
// fetch the xml tag <dogc>xxx</dogc>
- Pattern dowait = Pattern.compile("<dogc>",
- Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
+ Pattern dowait = Pattern.compile("<dogc>", Pattern.CASE_INSENSITIVE
+ | Pattern.DOTALL | Pattern.MULTILINE);
String[] results = dowait.split(fetchString);
if (results.length != 2) {
- throw new IOException("Throttle: Unable to parse response of URL " + url +
- ". Get retuned " + fetchString);
+ throw new IOException("Throttle: Unable to parse response of URL "
+ + url + ". Get retuned " + fetchString);
}
- dowait = Pattern.compile("</dogc>",
- Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE);
+ dowait = Pattern.compile("</dogc>", Pattern.CASE_INSENSITIVE
+ | Pattern.DOTALL | Pattern.MULTILINE);
results = dowait.split(results[1]);
if (results.length < 1) {
- throw new IOException("Throttle: Unable to parse response of URL " + url +
- ". Get retuned " + fetchString);
+ throw new IOException("Throttle: Unable to parse response of URL "
+ + url + ". Get retuned " + fetchString);
}
- // if the jobtracker signalled that the threshold is not exceeded,
+ // if the jobtracker signalled that the threshold is not exceeded,
// then we return immediately.
if (results[0].trim().compareToIgnoreCase("false") == 0) {
return;
@@ -102,8 +94,8 @@
// The JobTracker has exceeded its threshold and is doing a GC.
// The client has to wait and retry.
- LOG.warn("Job is being throttled because of resource crunch on the " +
- "JobTracker. Will retry in " + retry + " seconds..");
+ LOG.warn("Job is being throttled because of resource crunch on the "
+ + "JobTracker. Will retry in " + retry + " seconds..");
Thread.sleep(retry * 1000L);
}
} catch (Exception e) {