You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [7/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/java...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Jan 21 10:37:58 2010
@@ -34,7 +34,6 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.mapredWork;
import org.apache.hadoop.hive.ql.plan.partitionDesc;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -46,16 +45,19 @@
import org.apache.hadoop.io.Writable;
/**
- * Map operator. This triggers overall map side processing.
- * This is a little different from regular operators in that
- * it starts off by processing a Writable data structure from
- * a Table (instead of a Hive Object).
+ * Map operator. This triggers overall map side processing. This is a little
+ * different from regular operators in that it starts off by processing a
+ * Writable data structure from a Table (instead of a Hive Object).
**/
-public class MapOperator extends Operator <mapredWork> implements Serializable {
+public class MapOperator extends Operator<mapredWork> implements Serializable {
private static final long serialVersionUID = 1L;
- public static enum Counter {DESERIALIZE_ERRORS}
- transient private LongWritable deserialize_error_count = new LongWritable ();
+
+ public static enum Counter {
+ DESERIALIZE_ERRORS
+ }
+
+ transient private final LongWritable deserialize_error_count = new LongWritable();
transient private Deserializer deserializer;
transient private Object[] rowWithPart;
@@ -65,7 +67,7 @@
private Map<Operator<? extends Serializable>, java.util.ArrayList<String>> operatorToPaths;
- private java.util.ArrayList<String> childrenPaths = new ArrayList<String>();
+ private final java.util.ArrayList<String> childrenPaths = new ArrayList<String>();
private ArrayList<Operator<? extends Serializable>> extraChildrenToClose = null;
@@ -86,27 +88,31 @@
this.op = op;
}
+ @Override
public boolean equals(Object o) {
if (o instanceof MapInputPath) {
- MapInputPath mObj = (MapInputPath)o;
- if (mObj == null)
+ MapInputPath mObj = (MapInputPath) o;
+ if (mObj == null) {
return false;
- return path.equals(mObj.path) && alias.equals(mObj.alias) && op.equals(mObj.op);
+ }
+ return path.equals(mObj.path) && alias.equals(mObj.alias)
+ && op.equals(mObj.op);
}
return false;
}
+ @Override
public int hashCode() {
return (op == null) ? 0 : op.hashCode();
}
}
private static class MapOpCtx {
- boolean isPartitioned;
+ boolean isPartitioned;
StructObjectInspector rowObjectInspector;
- Object[] rowWithPart;
- Deserializer deserializer;
+ Object[] rowWithPart;
+ Deserializer deserializer;
public String tableName;
public String partName;
@@ -116,7 +122,8 @@
* @param rowWithPart
*/
public MapOpCtx(boolean isPartitioned,
- StructObjectInspector rowObjectInspector, Object[] rowWithPart, Deserializer deserializer) {
+ StructObjectInspector rowObjectInspector, Object[] rowWithPart,
+ Deserializer deserializer) {
this.isPartitioned = isPartitioned;
this.rowObjectInspector = rowObjectInspector;
this.rowWithPart = rowWithPart;
@@ -153,78 +160,89 @@
}
/**
- * Initializes this map op as the root of the tree. It sets JobConf & MapRedWork
- * and starts initialization of the operator tree rooted at this op.
+ * Initializes this map op as the root of the tree. It sets JobConf &
+ * MapRedWork and starts initialization of the operator tree rooted at this
+ * op.
+ *
* @param hconf
* @param mrwork
* @throws HiveException
*/
- public void initializeAsRoot(Configuration hconf, mapredWork mrwork) throws HiveException {
+ public void initializeAsRoot(Configuration hconf, mapredWork mrwork)
+ throws HiveException {
setConf(mrwork);
setChildren(hconf);
initialize(hconf, null);
}
- private static MapOpCtx initObjectInspector(mapredWork conf, Configuration hconf, String onefile)
- throws HiveException, ClassNotFoundException, InstantiationException, IllegalAccessException, SerDeException {
+ private static MapOpCtx initObjectInspector(mapredWork conf,
+ Configuration hconf, String onefile) throws HiveException,
+ ClassNotFoundException, InstantiationException, IllegalAccessException,
+ SerDeException {
partitionDesc td = conf.getPathToPartitionInfo().get(onefile);
LinkedHashMap<String, String> partSpec = td.getPartSpec();
Properties tblProps = td.getProperties();
Class sdclass = td.getDeserializerClass();
- if(sdclass == null) {
+ if (sdclass == null) {
String className = td.getSerdeClassName();
if ((className == "") || (className == null)) {
- throw new HiveException("SerDe class or the SerDe class name is not set for table: "
- + td.getProperties().getProperty("name"));
+ throw new HiveException(
+ "SerDe class or the SerDe class name is not set for table: "
+ + td.getProperties().getProperty("name"));
}
sdclass = hconf.getClassByName(className);
}
String tableName = String.valueOf(tblProps.getProperty("name"));
String partName = String.valueOf(partSpec);
- //HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, tableName);
- //HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, partName);
+ // HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, tableName);
+ // HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, partName);
Deserializer deserializer = (Deserializer) sdclass.newInstance();
deserializer.initialize(hconf, tblProps);
- StructObjectInspector rowObjectInspector = (StructObjectInspector)deserializer.getObjectInspector();
+ StructObjectInspector rowObjectInspector = (StructObjectInspector) deserializer
+ .getObjectInspector();
MapOpCtx opCtx = null;
// Next check if this table has partitions and if so
// get the list of partition names as well as allocate
// the serdes for the partition columns
- String pcols = tblProps.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
- //Log LOG = LogFactory.getLog(MapOperator.class.getName());
+ String pcols = tblProps
+ .getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
+ // Log LOG = LogFactory.getLog(MapOperator.class.getName());
if (pcols != null && pcols.length() > 0) {
String[] partKeys = pcols.trim().split("/");
List<String> partNames = new ArrayList<String>(partKeys.length);
Object[] partValues = new Object[partKeys.length];
- List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
- for(int i = 0; i < partKeys.length; i++ ) {
+ List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(
+ partKeys.length);
+ for (int i = 0; i < partKeys.length; i++) {
String key = partKeys[i];
partNames.add(key);
// Partitions do not exist for this table
- if (partSpec == null)
+ if (partSpec == null) {
partValues[i] = new Text();
- else
+ } else {
partValues[i] = new Text(partSpec.get(key));
- partObjectInspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
+ }
+ partObjectInspectors
+ .add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
}
StructObjectInspector partObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(partNames, partObjectInspectors);
+ .getStandardStructObjectInspector(partNames, partObjectInspectors);
Object[] rowWithPart = new Object[2];
rowWithPart[1] = partValues;
rowObjectInspector = ObjectInspectorFactory
- .getUnionStructObjectInspector(
- Arrays.asList(new StructObjectInspector[]{
- rowObjectInspector,
- partObjectInspector}));
- //LOG.info("dump " + tableName + " " + partName + " " + rowObjectInspector.getTypeName());
+ .getUnionStructObjectInspector(Arrays
+ .asList(new StructObjectInspector[] { rowObjectInspector,
+ partObjectInspector }));
+ // LOG.info("dump " + tableName + " " + partName + " " +
+ // rowObjectInspector.getTypeName());
opCtx = new MapOpCtx(true, rowObjectInspector, rowWithPart, deserializer);
- }
- else {
- //LOG.info("dump2 " + tableName + " " + partName + " " + rowObjectInspector.getTypeName());
+ } else {
+ // LOG.info("dump2 " + tableName + " " + partName + " " +
+ // rowObjectInspector.getTypeName());
opCtx = new MapOpCtx(false, rowObjectInspector, null, deserializer);
}
opCtx.tableName = tableName;
@@ -236,10 +254,9 @@
Path fpath = new Path((new Path(HiveConf.getVar(hconf,
HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath());
- ArrayList<Operator<? extends Serializable>> children =
- new ArrayList<Operator<? extends Serializable>>();
+ ArrayList<Operator<? extends Serializable>> children = new ArrayList<Operator<? extends Serializable>>();
opCtxMap = new HashMap<MapInputPath, MapOpCtx>();
- operatorToPaths = new HashMap<Operator<? extends Serializable>, java.util.ArrayList<String>> ();
+ operatorToPaths = new HashMap<Operator<? extends Serializable>, java.util.ArrayList<String>>();
statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
@@ -256,17 +273,21 @@
+ fpath.toUri().getPath());
MapInputPath inp = new MapInputPath(onefile, onealias, op);
opCtxMap.put(inp, opCtx);
- if(operatorToPaths.get(op) == null)
- operatorToPaths.put(op, new java.util.ArrayList<String>());
+ if (operatorToPaths.get(op) == null) {
+ operatorToPaths.put(op, new java.util.ArrayList<String>());
+ }
operatorToPaths.get(op).add(onefile);
- op.setParentOperators(new ArrayList<Operator<? extends Serializable>>());
+ op
+ .setParentOperators(new ArrayList<Operator<? extends Serializable>>());
op.getParentOperators().add(this);
- // check for the operators who will process rows coming to this Map Operator
+ // check for the operators who will process rows coming to this Map
+ // Operator
if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
children.add(op);
childrenPaths.add(onefile);
- LOG.info("dump " + op.getName() + " " + opCtxMap.get(inp).getRowObjectInspector().getTypeName());
+ LOG.info("dump " + op.getName() + " "
+ + opCtxMap.get(inp).getRowObjectInspector().getTypeName());
if (!done) {
deserializer = opCtxMap.get(inp).getDeserializer();
isPartitioned = opCtxMap.get(inp).isPartitioned();
@@ -292,48 +313,55 @@
}
}
-
+ @Override
public void initializeOp(Configuration hconf) throws HiveException {
// set that parent initialization is done and call initialize on children
state = State.INIT;
List<Operator<? extends Serializable>> children = getChildOperators();
for (Entry<MapInputPath, MapOpCtx> entry : opCtxMap.entrySet()) {
- // Add alias, table name, and partitions to hadoop conf so that their children will
+ // Add alias, table name, and partitions to hadoop conf so that their
+ // children will
// inherit these
- HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, entry.getValue().tableName);
- HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, entry.getValue().partName);
- MapInputPath input = entry.getKey();
+ HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME,
+ entry.getValue().tableName);
+ HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, entry
+ .getValue().partName);
+ MapInputPath input = entry.getKey();
Operator<? extends Serializable> op = input.op;
- // op is not in the children list, so need to remember it and close it afterwards
- if ( children.indexOf(op) == -1 ) {
- if ( extraChildrenToClose == null ) {
+ // op is not in the children list, so need to remember it and close it
+ // afterwards
+ if (children.indexOf(op) == -1) {
+ if (extraChildrenToClose == null) {
extraChildrenToClose = new ArrayList<Operator<? extends Serializable>>();
}
extraChildrenToClose.add(op);
}
- // multiple input paths may corresponding the same operator (tree). The
- // below logic is to avoid initialize one operator multiple times if there
- // is one input path in this mapper's input paths.
+ // multiple input paths may corresponding the same operator (tree). The
+ // below logic is to avoid initialize one operator multiple times if there
+ // is one input path in this mapper's input paths.
boolean shouldInit = true;
List<String> paths = operatorToPaths.get(op);
- for(String path: paths) {
- if(childrenPaths.contains(path) && !path.equals(input.path)) {
- shouldInit = false;
- break;
- }
+ for (String path : paths) {
+ if (childrenPaths.contains(path) && !path.equals(input.path)) {
+ shouldInit = false;
+ break;
+ }
+ }
+ if (shouldInit) {
+ op.initialize(hconf, new ObjectInspector[] { entry.getValue()
+ .getRowObjectInspector() });
}
- if(shouldInit)
- op.initialize(hconf, new ObjectInspector[]{entry.getValue().getRowObjectInspector()});
}
}
/**
* close extra child operators that are initialized but are not executed.
*/
+ @Override
public void closeOp(boolean abort) throws HiveException {
- if ( extraChildrenToClose != null ) {
+ if (extraChildrenToClose != null) {
for (Operator<? extends Serializable> op : extraChildrenToClose) {
op.close(abort);
}
@@ -351,16 +379,17 @@
}
} catch (SerDeException e) {
// TODO: policy on deserialization errors
- deserialize_error_count.set(deserialize_error_count.get()+1);
- throw new HiveException (e);
+ deserialize_error_count.set(deserialize_error_count.get() + 1);
+ throw new HiveException(e);
}
}
- public void processOp(Object row, int tag)
- throws HiveException {
+ @Override
+ public void processOp(Object row, int tag) throws HiveException {
throw new HiveException("Hive 2 Internal error: should not be called!");
}
+ @Override
public String getName() {
return "MAP";
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Thu Jan 21 10:37:58 2010
@@ -35,22 +35,23 @@
import org.apache.hadoop.hive.shims.ShimLoader;
/**
- * Alternate implementation (to ExecDriver) of spawning a mapreduce task that runs it from
- * a separate jvm. The primary issue with this is the inability to control logging from
- * a separate jvm in a consistent manner
+ * Alternate implementation (to ExecDriver) of spawning a mapreduce task that
+ * runs it from a separate jvm. The primary issue with this is the inability to
+ * control logging from a separate jvm in a consistent manner
**/
public class MapRedTask extends Task<mapredWork> implements Serializable {
-
+
private static final long serialVersionUID = 1L;
final static String hadoopMemKey = "HADOOP_HEAPSIZE";
final static String hadoopOptsKey = "HADOOP_OPTS";
- final static String HIVE_SYS_PROP[] = {"build.dir", "build.dir.hive"};
-
+ final static String HIVE_SYS_PROP[] = { "build.dir", "build.dir.hive" };
+
public MapRedTask() {
super();
}
-
+
+ @Override
public int execute() {
try {
@@ -60,7 +61,8 @@
String libJarsOption;
{
- String addedJars = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.JAR);
+ String addedJars = ExecDriver.getResourceFiles(conf,
+ SessionState.ResourceType.JAR);
conf.setVar(ConfVars.HIVEADDEDJARS, addedJars);
String auxJars = conf.getAuxJars();
@@ -76,14 +78,14 @@
libJarsOption = " -libjars " + addedJars + " ";
} else {
libJarsOption = " -libjars " + addedJars + "," + auxJars + " ";
- }
+ }
}
}
// Generate the hiveConfArgs after potentially adding the jars
String hiveConfArgs = ExecDriver.generateCmdLine(conf);
File scratchDir = new File(conf.getVar(HiveConf.ConfVars.SCRATCHDIR));
-
+
mapredWork plan = getWork();
File planFile = File.createTempFile("plan", ".xml", scratchDir);
@@ -91,21 +93,22 @@
FileOutputStream out = new FileOutputStream(planFile);
Utilities.serializeMapRedWork(plan, out);
- String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent"))
- ? "-silent" : "";
+ String isSilent = "true".equalsIgnoreCase(System
+ .getProperty("test.silent")) ? "-silent" : "";
String jarCmd;
- if(ShimLoader.getHadoopShims().usesJobShell()) {
+ if (ShimLoader.getHadoopShims().usesJobShell()) {
jarCmd = libJarsOption + hiveJar + " " + ExecDriver.class.getName();
} else {
jarCmd = hiveJar + " " + ExecDriver.class.getName() + libJarsOption;
}
- String cmdLine = hadoopExec + " jar " + jarCmd +
- " -plan " + planFile.toString() + " " + isSilent + " " + hiveConfArgs;
-
- String files = ExecDriver.getResourceFiles(conf, SessionState.ResourceType.FILE);
- if(!files.isEmpty()) {
+ String cmdLine = hadoopExec + " jar " + jarCmd + " -plan "
+ + planFile.toString() + " " + isSilent + " " + hiveConfArgs;
+
+ String files = ExecDriver.getResourceFiles(conf,
+ SessionState.ResourceType.FILE);
+ if (!files.isEmpty()) {
cmdLine = cmdLine + " -files " + files;
}
@@ -117,63 +120,65 @@
{
StringBuilder sb = new StringBuilder();
Properties p = System.getProperties();
- for (int k = 0; k < HIVE_SYS_PROP.length; k++) {
- if (p.containsKey(HIVE_SYS_PROP[k])) {
- sb.append(" -D" + HIVE_SYS_PROP[k] + "=" + p.getProperty(HIVE_SYS_PROP[k]));
+ for (String element : HIVE_SYS_PROP) {
+ if (p.containsKey(element)) {
+ sb.append(" -D" + element + "=" + p.getProperty(element));
}
}
hadoopOpts = sb.toString();
}
-
+
// Inherit the environment variables
String[] env;
{
Map<String, String> variables = new HashMap(System.getenv());
// The user can specify the hadoop memory
int hadoopMem = conf.getIntVar(HiveConf.ConfVars.HIVEHADOOPMAXMEM);
-
+
if (hadoopMem == 0) {
variables.remove(hadoopMemKey);
} else {
// user specified the memory - only applicable for local mode
variables.put(hadoopMemKey, String.valueOf(hadoopMem));
}
-
+
if (variables.containsKey(hadoopOptsKey)) {
- variables.put(hadoopOptsKey, variables.get(hadoopOptsKey) + hadoopOpts);
+ variables.put(hadoopOptsKey, variables.get(hadoopOptsKey)
+ + hadoopOpts);
} else {
variables.put(hadoopOptsKey, hadoopOpts);
}
-
+
env = new String[variables.size()];
int pos = 0;
- for (Map.Entry<String, String> entry : variables.entrySet()) {
- String name = entry.getKey();
- String value = entry.getValue();
- env[pos++] = name + "=" + value;
- }
+ for (Map.Entry<String, String> entry : variables.entrySet()) {
+ String name = entry.getKey();
+ String value = entry.getValue();
+ env[pos++] = name + "=" + value;
+ }
}
-
+
// Run ExecDriver in another JVM
executor = Runtime.getRuntime().exec(cmdLine, env);
- StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
- StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
-
+ StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(),
+ null, System.out);
+ StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(),
+ null, System.err);
+
outPrinter.start();
errPrinter.start();
-
+
int exitVal = executor.waitFor();
- if(exitVal != 0) {
+ if (exitVal != 0) {
LOG.error("Execution failed with exit status: " + exitVal);
} else {
LOG.info("Execution completed successfully");
}
return exitVal;
- }
- catch (Exception e) {
+ } catch (Exception e) {
e.printStackTrace();
LOG.error("Exception: " + e.getMessage());
return (1);
@@ -190,7 +195,8 @@
mapredWork w = getWork();
return w.getReducer() != null;
}
-
+
+ @Override
public int getType() {
return StageType.MAPREDLOCAL;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Thu Jan 21 10:37:58 2010
@@ -51,6 +51,7 @@
super();
}
+ @Override
public int execute() {
try {
@@ -64,39 +65,44 @@
if (lfd.getIsDfsDir()) {
// Just do a rename on the URIs, they belong to the same FS
String mesg = "Moving data to: " + lfd.getTargetDir();
- String mesg_detail = " from " + lfd.getSourceDir();
+ String mesg_detail = " from " + lfd.getSourceDir();
console.printInfo(mesg, mesg_detail);
// delete the output directory if it already exists
fs.delete(targetPath, true);
// if source exists, rename. Otherwise, create a empty directory
if (fs.exists(sourcePath)) {
- if (!fs.rename(sourcePath, targetPath))
- throw new HiveException ("Unable to rename: " + sourcePath + " to: "
- + targetPath);
- } else
- if (!fs.mkdirs(targetPath))
- throw new HiveException ("Unable to make directory: " + targetPath);
+ if (!fs.rename(sourcePath, targetPath)) {
+ throw new HiveException("Unable to rename: " + sourcePath
+ + " to: " + targetPath);
+ }
+ } else if (!fs.mkdirs(targetPath)) {
+ throw new HiveException("Unable to make directory: " + targetPath);
+ }
} else {
// This is a local file
String mesg = "Copying data to local directory " + lfd.getTargetDir();
- String mesg_detail = " from " + lfd.getSourceDir();
+ String mesg_detail = " from " + lfd.getSourceDir();
console.printInfo(mesg, mesg_detail);
// delete the existing dest directory
LocalFileSystem dstFs = FileSystem.getLocal(conf);
- if(dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
+ if (dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
console.printInfo(mesg, mesg_detail);
// if source exists, rename. Otherwise, create a empty directory
- if (fs.exists(sourcePath))
+ if (fs.exists(sourcePath)) {
fs.copyToLocalFile(sourcePath, targetPath);
- else {
- if (!dstFs.mkdirs(targetPath))
- throw new HiveException ("Unable to make local directory: " + targetPath);
+ } else {
+ if (!dstFs.mkdirs(targetPath)) {
+ throw new HiveException("Unable to make local directory: "
+ + targetPath);
+ }
}
} else {
- throw new AccessControlException("Unable to delete the existing destination directory: " + targetPath);
+ throw new AccessControlException(
+ "Unable to delete the existing destination directory: "
+ + targetPath);
}
}
}
@@ -104,55 +110,69 @@
// Next we do this for tables and partitions
loadTableDesc tbd = work.getLoadTableWork();
if (tbd != null) {
- String mesg = "Loading data to table " + tbd.getTable().getTableName() +
- ((tbd.getPartitionSpec().size() > 0) ?
- " partition " + tbd.getPartitionSpec().toString() : "");
+ String mesg = "Loading data to table "
+ + tbd.getTable().getTableName()
+ + ((tbd.getPartitionSpec().size() > 0) ? " partition "
+ + tbd.getPartitionSpec().toString() : "");
String mesg_detail = " from " + tbd.getSourceDir();
console.printInfo(mesg, mesg_detail);
- Table table = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbd.getTable().getTableName());
+ Table table = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbd
+ .getTable().getTableName());
if (work.getCheckFileFormat()) {
// Get all files from the src directory
- FileStatus [] dirs;
+ FileStatus[] dirs;
ArrayList<FileStatus> files;
FileSystem fs;
try {
- fs = FileSystem.get(table.getDataLocation(),conf);
+ fs = FileSystem.get(table.getDataLocation(), conf);
dirs = fs.globStatus(new Path(tbd.getSourceDir()));
files = new ArrayList<FileStatus>();
- for (int i=0; (dirs != null && i<dirs.length); i++) {
+ for (int i = 0; (dirs != null && i < dirs.length); i++) {
files.addAll(Arrays.asList(fs.listStatus(dirs[i].getPath())));
- // We only check one file, so exit the loop when we have at least one.
- if (files.size()>0) break;
+ // We only check one file, so exit the loop when we have at least
+ // one.
+ if (files.size() > 0) {
+ break;
+ }
}
} catch (IOException e) {
- throw new HiveException("addFiles: filesystem error in check phase", e);
+ throw new HiveException(
+ "addFiles: filesystem error in check phase", e);
}
// Check if the file format of the file matches that of the table.
- boolean flag = HiveFileFormatUtils.checkInputFormat(fs, conf, tbd.getTable().getInputFileFormatClass(), files);
- if(!flag)
- throw new HiveException("Wrong file format. Please check the file's format.");
+ boolean flag = HiveFileFormatUtils.checkInputFormat(fs, conf, tbd
+ .getTable().getInputFileFormatClass(), files);
+ if (!flag) {
+ throw new HiveException(
+ "Wrong file format. Please check the file's format.");
+ }
}
- if(tbd.getPartitionSpec().size() == 0) {
- db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir()));
- if (work.getOutputs() != null)
+ if (tbd.getPartitionSpec().size() == 0) {
+ db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable()
+ .getTableName(), tbd.getReplace(), new Path(tbd.getTmpDir()));
+ if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(table));
+ }
} else {
LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
- db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
- tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()));
- Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
- if (work.getOutputs() != null)
+ db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable()
+ .getTableName(), tbd.getPartitionSpec(), tbd.getReplace(),
+ new Path(tbd.getTmpDir()));
+ Partition partn = db.getPartition(table, tbd.getPartitionSpec(),
+ false);
+ if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(partn));
+ }
}
}
return 0;
- }
- catch (Exception e) {
- console.printError("Failed with exception " + e.getMessage(), "\n" + StringUtils.stringifyException(e));
+ } catch (Exception e) {
+ console.printError("Failed with exception " + e.getMessage(), "\n"
+ + StringUtils.stringifyException(e));
return (1);
}
}
@@ -162,21 +182,23 @@
*/
public boolean isLocal() {
loadTableDesc tbd = work.getLoadTableWork();
- if (tbd != null)
+ if (tbd != null) {
return false;
+ }
loadFileDesc lfd = work.getLoadFileWork();
if (lfd != null) {
if (lfd.getIsDfsDir()) {
return false;
- }
- else
+ } else {
return true;
+ }
}
return false;
}
+ @Override
public int getType() {
return StageType.MOVE;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericOpMethodResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericOpMethodResolver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericOpMethodResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericOpMethodResolver.java Thu Jan 21 10:37:58 2010
@@ -23,23 +23,20 @@
import java.util.Arrays;
import java.util.List;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
/**
- * The class implements the method resolution for operators like
- * (+, -, *, %). The resolution logic is as follows:
+ * The class implements the method resolution for operators like (+, -, *, %).
+ * The resolution logic is as follows:
*
- * 1. If one of the parameters is a string, then it resolves to
- * evaluate(double, double)
- * 2. If one of the parameters is null, then it resolves to evaluate(T, T)
- * where T is the other non-null parameter type.
- * 3. If both of the parameters are null, then it resolves to
- * evaluate(byte, byte)
- * 4. Otherwise, it resolves to evaluate(T, T), where T is the type resulting
- * from calling FunctionRegistry.getCommonClass() on the two arguments.
+ * 1. If one of the parameters is a string, then it resolves to evaluate(double,
+ * double) 2. If one of the parameters is null, then it resolves to evaluate(T,
+ * T) where T is the other non-null parameter type. 3. If both of the parameters
+ * are null, then it resolves to evaluate(byte, byte) 4. Otherwise, it resolves
+ * to evaluate(T, T), where T is the type resulting from calling
+ * FunctionRegistry.getCommonClass() on the two arguments.
*/
public class NumericOpMethodResolver implements UDFMethodResolver {
@@ -47,65 +44,68 @@
* The udfclass for which resolution is needed.
*/
Class<? extends UDF> udfClass;
-
+
/**
* Constuctor.
*/
public NumericOpMethodResolver(Class<? extends UDF> udfClass) {
this.udfClass = udfClass;
}
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.hive.ql.exec.UDFMethodResolver#getEvalMethod(java.util.List)
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hive.ql.exec.UDFMethodResolver#getEvalMethod(java.util
+ * .List)
*/
@Override
public Method getEvalMethod(List<TypeInfo> argTypeInfos)
- throws AmbiguousMethodException, UDFArgumentException {
- assert(argTypeInfos.size() == 2);
+ throws AmbiguousMethodException, UDFArgumentException {
+ assert (argTypeInfos.size() == 2);
List<TypeInfo> pTypeInfos = null;
List<TypeInfo> modArgTypeInfos = new ArrayList<TypeInfo>();
// If either argument is a string, we convert to a double because a number
// in string form should always be convertible into a double
- if (argTypeInfos.get(0).equals(TypeInfoFactory.stringTypeInfo) ||
- argTypeInfos.get(1).equals(TypeInfoFactory.stringTypeInfo) ) {
+ if (argTypeInfos.get(0).equals(TypeInfoFactory.stringTypeInfo)
+ || argTypeInfos.get(1).equals(TypeInfoFactory.stringTypeInfo)) {
modArgTypeInfos.add(TypeInfoFactory.doubleTypeInfo);
modArgTypeInfos.add(TypeInfoFactory.doubleTypeInfo);
} else {
// If it's a void, we change the type to a byte because once the types
// are run through getCommonClass(), a byte and any other type T will
// resolve to type T
- for(int i=0; i<2; i++) {
- if(argTypeInfos.get(i).equals(TypeInfoFactory.voidTypeInfo)) {
- modArgTypeInfos.add(TypeInfoFactory.byteTypeInfo);
+ for (int i = 0; i < 2; i++) {
+ if (argTypeInfos.get(i).equals(TypeInfoFactory.voidTypeInfo)) {
+ modArgTypeInfos.add(TypeInfoFactory.byteTypeInfo);
} else {
modArgTypeInfos.add(argTypeInfos.get(i));
}
}
}
-
- TypeInfo commonType = FunctionRegistry.getCommonClass(
- modArgTypeInfos.get(0),
- modArgTypeInfos.get(1));
-
- if(commonType == null) {
- throw new UDFArgumentException("Unable to find a common class between" +
- "types " + modArgTypeInfos.get(0).getTypeName() +
- " and " + modArgTypeInfos.get(1).getTypeName());
+
+ TypeInfo commonType = FunctionRegistry.getCommonClass(modArgTypeInfos
+ .get(0), modArgTypeInfos.get(1));
+
+ if (commonType == null) {
+ throw new UDFArgumentException("Unable to find a common class between"
+ + "types " + modArgTypeInfos.get(0).getTypeName() + " and "
+ + modArgTypeInfos.get(1).getTypeName());
}
-
+
pTypeInfos = new ArrayList<TypeInfo>();
pTypeInfos.add(commonType);
pTypeInfos.add(commonType);
Method udfMethod = null;
- for(Method m: Arrays.asList(udfClass.getMethods())) {
+ for (Method m : Arrays.asList(udfClass.getMethods())) {
if (m.getName().equals("evaluate")) {
- List<TypeInfo> argumentTypeInfos = TypeInfoUtils.getParameterTypeInfos(m,
- pTypeInfos.size());
+ List<TypeInfo> argumentTypeInfos = TypeInfoUtils.getParameterTypeInfos(
+ m, pTypeInfos.size());
if (argumentTypeInfos == null) {
// null means the method does not accept number of arguments passed.
continue;
@@ -113,7 +113,7 @@
boolean match = (argumentTypeInfos.size() == pTypeInfos.size());
- for(int i=0; i<pTypeInfos.size() && match; i++) {
+ for (int i = 0; i < pTypeInfos.size() && match; i++) {
TypeInfo accepted = argumentTypeInfos.get(i);
if (!accepted.equals(pTypeInfos.get(i))) {
match = false;
@@ -123,13 +123,12 @@
if (match) {
if (udfMethod != null) {
throw new AmbiguousMethodException(udfClass, argTypeInfos);
- }
- else {
+ } else {
udfMethod = m;
}
}
}
}
- return udfMethod;
+ return udfMethod;
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAF.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAF.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAF.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAF.java Thu Jan 21 10:37:58 2010
@@ -19,7 +19,8 @@
package org.apache.hadoop.hive.ql.exec;
/**
- * Base class of numeric UDAFs like sum and avg which need a NumericUDAFEvaluatorResolver.
+ * Base class of numeric UDAFs like sum and avg which need a
+ * NumericUDAFEvaluatorResolver.
*/
public class NumericUDAF extends UDAF {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAFEvaluatorResolver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAFEvaluatorResolver.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAFEvaluatorResolver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/NumericUDAFEvaluatorResolver.java Thu Jan 21 10:37:58 2010
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import java.sql.Date;
import java.util.ArrayList;
import java.util.List;
@@ -26,8 +25,9 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
/**
- * Resolver for Numeric UDAFs like sum and avg. If the input argument is string or date,
- * the resolver returns the evaluator whose iterate function operates on doubles.
+ * Resolver for Numeric UDAFs like sum and avg. If the input argument is string
+ * or date, the resolver returns the evaluator whose iterate function operates
+ * on doubles.
*/
public class NumericUDAFEvaluatorResolver extends DefaultUDAFEvaluatorResolver {
@@ -37,16 +37,21 @@
public NumericUDAFEvaluatorResolver(Class<? extends UDAF> udafClass) {
super(udafClass);
}
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.hive.ql.exec.UDAFMethodResolver#getEvaluatorClass(java.util.List)
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hive.ql.exec.UDAFMethodResolver#getEvaluatorClass(java
+ * .util.List)
*/
@Override
public Class<? extends UDAFEvaluator> getEvaluatorClass(
List<TypeInfo> argTypeInfos) throws AmbiguousMethodException {
- // Go through the argClasses and for any string, void or date time, start looking for doubles
+ // Go through the argClasses and for any string, void or date time, start
+ // looking for doubles
ArrayList<TypeInfo> args = new ArrayList<TypeInfo>();
- for(TypeInfo arg: argTypeInfos) {
+ for (TypeInfo arg : argTypeInfos) {
if (arg.equals(TypeInfoFactory.voidTypeInfo)
|| arg.equals(TypeInfoFactory.stringTypeInfo)) {
args.add(TypeInfoFactory.doubleTypeInfo);
@@ -54,7 +59,7 @@
args.add(arg);
}
}
-
+
return super.getEvaluatorClass(args);
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Jan 21 10:37:58 2010
@@ -45,7 +45,8 @@
/**
* Base operator implementation
**/
-public abstract class Operator <T extends Serializable> implements Serializable, Node {
+public abstract class Operator<T extends Serializable> implements Serializable,
+ Node {
// Bean methods
@@ -55,42 +56,44 @@
protected List<Operator<? extends Serializable>> parentOperators;
protected String operatorId;
/**
- * List of counter names associated with the operator
- * It contains the following default counters
- * NUM_INPUT_ROWS
- * NUM_OUTPUT_ROWS
- * TIME_TAKEN
+ * List of counter names associated with the operator It contains the
+ * following default counters NUM_INPUT_ROWS NUM_OUTPUT_ROWS TIME_TAKEN
* Individual operators can add to this list via addToCounterNames methods
*/
protected ArrayList<String> counterNames;
/**
* Each operator has its own map of its counter names to disjoint
- * ProgressCounter - it is populated at compile time and is read in
- * at run-time while extracting the operator specific counts
+ * ProgressCounter - it is populated at compile time and is read in at
+ * run-time while extracting the operator specific counts
*/
protected HashMap<String, ProgressCounter> counterNameToEnum;
-
private static int seqId;
- // It can be optimized later so that an operator operator (init/close) is performed
- // only after that operation has been performed on all the parents. This will require
- // initializing the whole tree in all the mappers (which might be required for mappers
+ // It can be optimized later so that an operator operator (init/close) is
+ // performed
+ // only after that operation has been performed on all the parents. This will
+ // require
+ // initializing the whole tree in all the mappers (which might be required for
+ // mappers
// spanning multiple files anyway, in future)
public static enum State {
- UNINIT, // initialize() has not been called
- INIT, // initialize() has been called and close() has not been called,
- // or close() has been called but one of its parent is not closed.
- CLOSE // all its parents operators are in state CLOSE and called close()
- // to children. Note: close() being called and its state being CLOSE is
- // difference since close() could be called but state is not CLOSE if
- // one of its parent is not in state CLOSE..
+ UNINIT, // initialize() has not been called
+ INIT, // initialize() has been called and close() has not been called,
+ // or close() has been called but one of its parent is not closed.
+ CLOSE
+ // all its parents operators are in state CLOSE and called close()
+ // to children. Note: close() being called and its state being CLOSE is
+ // difference since close() could be called but state is not CLOSE if
+ // one of its parent is not in state CLOSE..
};
+
transient protected State state = State.UNINIT;
- transient static boolean fatalError = false; // fatalError is shared acorss all operators
-
+ transient static boolean fatalError = false; // fatalError is shared acorss
+ // all operators
+
static {
seqId = 0;
}
@@ -102,17 +105,20 @@
public static void resetId() {
seqId = 0;
}
-
+
/**
* Create an operator with a reporter.
- * @param reporter Used to report progress of certain operators.
+ *
+ * @param reporter
+ * Used to report progress of certain operators.
*/
public Operator(Reporter reporter) {
this.reporter = reporter;
id = String.valueOf(seqId++);
}
- public void setChildOperators(List<Operator<? extends Serializable>> childOperators) {
+ public void setChildOperators(
+ List<Operator<? extends Serializable>> childOperators) {
this.childOperators = childOperators;
}
@@ -130,14 +136,15 @@
}
Vector<Node> ret_vec = new Vector<Node>();
- for(Operator<? extends Serializable> op: getChildOperators()) {
+ for (Operator<? extends Serializable> op : getChildOperators()) {
ret_vec.add(op);
}
return ret_vec;
}
- public void setParentOperators(List<Operator<? extends Serializable>> parentOperators) {
+ public void setParentOperators(
+ List<Operator<? extends Serializable>> parentOperators) {
this.parentOperators = parentOperators;
}
@@ -178,7 +185,7 @@
// non-bean ..
- transient protected HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable> ();
+ transient protected HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>();
transient protected OutputCollector out;
transient protected Log LOG = LogFactory.getLog(this.getClass().getName());
transient protected String alias;
@@ -190,9 +197,9 @@
transient protected ObjectInspector outputObjInspector;
/**
- * A map of output column name to input expression map. This is used by optimizer
- * and built during semantic analysis
- * contains only key elements for reduce sink and group by op
+ * A map of output column name to input expression map. This is used by
+ * optimizer and built during semantic analysis contains only key elements for
+ * reduce sink and group by op
*/
protected transient Map<String, exprNodeDesc> colExprMap;
@@ -201,21 +208,24 @@
}
/**
- * This function is not named getId(), to make sure java serialization
- * does NOT serialize it. Some TestParse tests will fail if we serialize
- * this field, since the Operator ID will change based on the number of
- * query tests.
+ * This function is not named getId(), to make sure java serialization does
+ * NOT serialize it. Some TestParse tests will fail if we serialize this
+ * field, since the Operator ID will change based on the number of query
+ * tests.
*/
- public String getIdentifier() { return id; }
+ public String getIdentifier() {
+ return id;
+ }
public void setReporter(Reporter rep) {
reporter = rep;
// the collector is same across all operators
- if(childOperators == null)
+ if (childOperators == null) {
return;
+ }
- for(Operator<? extends Serializable> op: childOperators) {
+ for (Operator<? extends Serializable> op : childOperators) {
op.setReporter(rep);
}
}
@@ -224,10 +234,11 @@
this.out = out;
// the collector is same across all operators
- if(childOperators == null)
+ if (childOperators == null) {
return;
+ }
- for(Operator<? extends Serializable> op: childOperators) {
+ for (Operator<? extends Serializable> op : childOperators) {
op.setOutputCollector(out);
}
}
@@ -238,31 +249,34 @@
public void setAlias(String alias) {
this.alias = alias;
- if(childOperators == null)
+ if (childOperators == null) {
return;
+ }
- for(Operator<? extends Serializable> op: childOperators) {
+ for (Operator<? extends Serializable> op : childOperators) {
op.setAlias(alias);
}
}
public Map<Enum<?>, Long> getStats() {
- HashMap<Enum<?>, Long> ret = new HashMap<Enum<?>, Long> ();
- for(Enum<?> one: statsMap.keySet()) {
+ HashMap<Enum<?>, Long> ret = new HashMap<Enum<?>, Long>();
+ for (Enum<?> one : statsMap.keySet()) {
ret.put(one, Long.valueOf(statsMap.get(one).get()));
}
- return(ret);
+ return (ret);
}
/**
* checks whether all parent operators are initialized or not
- * @return true if there are no parents or all parents are initialized. false otherwise
+ *
+ * @return true if there are no parents or all parents are initialized. false
+ * otherwise
*/
protected boolean areAllParentsInitialized() {
if (parentOperators == null) {
return true;
}
- for(Operator<? extends Serializable> parent: parentOperators) {
+ for (Operator<? extends Serializable> parent : parentOperators) {
if (parent.state != State.INIT) {
return false;
}
@@ -271,46 +285,51 @@
}
/**
- * Initializes operators only if all parents have been initialized.
- * Calls operator specific initializer which then initializes child ops.
- *
+ * Initializes operators only if all parents have been initialized. Calls
+ * operator specific initializer which then initializes child ops.
+ *
* @param hconf
- * @param inputOIs input object inspector array indexes by tag id. null value is ignored.
+ * @param inputOIs
+ * input object inspector array indexes by tag id. null value is
+ * ignored.
* @throws HiveException
*/
- public void initialize(Configuration hconf, ObjectInspector[] inputOIs) throws HiveException {
+ public void initialize(Configuration hconf, ObjectInspector[] inputOIs)
+ throws HiveException {
if (state == State.INIT) {
return;
}
- if(!areAllParentsInitialized()) {
+ if (!areAllParentsInitialized()) {
return;
}
-
+
LOG.info("Initializing Self " + id + " " + getName());
if (inputOIs != null) {
inputObjInspectors = inputOIs;
}
- // initialize structure to maintain child op info. operator tree changes while
+ // initialize structure to maintain child op info. operator tree changes
+ // while
// initializing so this need to be done here instead of initialize() method
if (childOperators != null) {
childOperatorsArray = new Operator[childOperators.size()];
- for (int i=0; i<childOperatorsArray.length; i++) {
+ for (int i = 0; i < childOperatorsArray.length; i++) {
childOperatorsArray[i] = childOperators.get(i);
}
childOperatorsTag = new int[childOperatorsArray.length];
- for (int i=0; i<childOperatorsArray.length; i++) {
- List<Operator<? extends Serializable>> parentOperators =
- childOperatorsArray[i].getParentOperators();
+ for (int i = 0; i < childOperatorsArray.length; i++) {
+ List<Operator<? extends Serializable>> parentOperators = childOperatorsArray[i]
+ .getParentOperators();
if (parentOperators == null) {
throw new HiveException("Hive internal error: parent is null in "
+ childOperatorsArray[i].getClass() + "!");
}
childOperatorsTag[i] = parentOperators.indexOf(this);
if (childOperatorsTag[i] == -1) {
- throw new HiveException("Hive internal error: cannot find parent in the child operator!");
+ throw new HiveException(
+ "Hive internal error: cannot find parent in the child operator!");
}
}
}
@@ -333,7 +352,8 @@
}
/**
- * Calls initialize on each of the children with outputObjetInspector as the output row format
+ * Calls initialize on each of the children with outputObjetInspector as the
+ * output row format
*/
protected void initializeChildren(Configuration hconf) throws HiveException {
state = State.INIT;
@@ -343,45 +363,59 @@
}
LOG.info("Initializing children of " + id + " " + getName());
for (int i = 0; i < childOperatorsArray.length; i++) {
- childOperatorsArray[i].initialize(hconf, outputObjInspector, childOperatorsTag[i]);
- if ( reporter != null ) {
+ childOperatorsArray[i].initialize(hconf, outputObjInspector,
+ childOperatorsTag[i]);
+ if (reporter != null) {
childOperatorsArray[i].setReporter(reporter);
}
}
}
/**
- * Collects all the parent's output object inspectors and calls actual initialization method
+ * Collects all the parent's output object inspectors and calls actual
+ * initialization method
+ *
* @param hconf
- * @param inputOI OI of the row that this parent will pass to this op
- * @param parentId parent operator id
+ * @param inputOI
+ * OI of the row that this parent will pass to this op
+ * @param parentId
+ * parent operator id
* @throws HiveException
*/
- private void initialize(Configuration hconf, ObjectInspector inputOI, int parentId) throws HiveException {
+ private void initialize(Configuration hconf, ObjectInspector inputOI,
+ int parentId) throws HiveException {
LOG.info("Initializing child " + id + " " + getName());
inputObjInspectors[parentId] = inputOI;
// call the actual operator initialization function
initialize(hconf, null);
}
-
- /**
+ /**
* Process the row.
- * @param row The object representing the row.
- * @param tag The tag of the row usually means which parent this row comes from.
- * Rows with the same tag should have exactly the same rowInspector all the time.
+ *
+ * @param row
+ * The object representing the row.
+ * @param tag
+ * The tag of the row usually means which parent this row comes from.
+ * Rows with the same tag should have exactly the same rowInspector
+ * all the time.
*/
public abstract void processOp(Object row, int tag) throws HiveException;
/**
* Process the row.
- * @param row The object representing the row.
- * @param tag The tag of the row usually means which parent this row comes from.
- * Rows with the same tag should have exactly the same rowInspector all the time.
+ *
+ * @param row
+ * The object representing the row.
+ * @param tag
+ * The tag of the row usually means which parent this row comes from.
+ * Rows with the same tag should have exactly the same rowInspector
+ * all the time.
*/
public void process(Object row, int tag) throws HiveException {
- if ( fatalError )
+ if (fatalError) {
return;
+ }
preProcessCounter();
processOp(row, tag);
postProcessCounter();
@@ -391,15 +425,18 @@
public void startGroup() throws HiveException {
LOG.debug("Starting group");
- if (childOperators == null)
+ if (childOperators == null) {
return;
-
- if ( fatalError )
+ }
+
+ if (fatalError) {
return;
+ }
LOG.debug("Starting group for children:");
- for (Operator<? extends Serializable> op: childOperators)
+ for (Operator<? extends Serializable> op : childOperators) {
op.startGroup();
+ }
LOG.debug("Start group Done");
}
@@ -408,24 +445,26 @@
public void endGroup() throws HiveException {
LOG.debug("Ending group");
- if (childOperators == null)
+ if (childOperators == null) {
return;
+ }
- if ( fatalError )
+ if (fatalError) {
return;
+ }
LOG.debug("Ending group for children:");
- for (Operator<? extends Serializable> op: childOperators)
+ for (Operator<? extends Serializable> op : childOperators) {
op.endGroup();
+ }
LOG.debug("End group Done");
}
private boolean allInitializedParentsAreClosed() {
if (parentOperators != null) {
- for(Operator<? extends Serializable> parent: parentOperators) {
- if (!(parent.state == State.CLOSE ||
- parent.state == State.UNINIT)) {
+ for (Operator<? extends Serializable> parent : parentOperators) {
+ if (!(parent.state == State.CLOSE || parent.state == State.UNINIT)) {
return false;
}
}
@@ -438,12 +477,14 @@
// more than 1 thread should call this close() function.
public void close(boolean abort) throws HiveException {
- if (state == State.CLOSE)
+ if (state == State.CLOSE) {
return;
+ }
// check if all parents are finished
- if (!allInitializedParentsAreClosed())
+ if (!allInitializedParentsAreClosed()) {
return;
+ }
// set state as CLOSE as long as all parents are closed
// state == CLOSE doesn't mean all children are also in state CLOSE
@@ -463,10 +504,11 @@
try {
logStats();
- if(childOperators == null)
+ if (childOperators == null) {
return;
+ }
- for(Operator<? extends Serializable> op: childOperators) {
+ for (Operator<? extends Serializable> op : childOperators) {
op.close(abort);
}
@@ -478,33 +520,35 @@
}
/**
- * Operator specific close routine. Operators which inherents this
- * class should overwrite this funtion for their specific cleanup
- * routine.
+ * Operator specific close routine. Operators which inherents this class
+ * should overwrite this funtion for their specific cleanup routine.
*/
protected void closeOp(boolean abort) throws HiveException {
}
-
/**
* Unlike other operator interfaces which are called from map or reduce task,
* jobClose is called from the jobclient side once the job has completed
- *
- * @param conf Configuration with with which job was submitted
- * @param success whether the job was completed successfully or not
+ *
+ * @param conf
+ * Configuration with with which job was submitted
+ * @param success
+ * whether the job was completed successfully or not
*/
- public void jobClose(Configuration conf, boolean success) throws HiveException {
- if(childOperators == null)
+ public void jobClose(Configuration conf, boolean success)
+ throws HiveException {
+ if (childOperators == null) {
return;
+ }
- for(Operator<? extends Serializable> op: childOperators) {
+ for (Operator<? extends Serializable> op : childOperators) {
op.jobClose(conf, success);
}
}
/**
- * Cache childOperators in an array for faster access. childOperatorsArray is accessed
- * per row, so it's important to make the access efficient.
+ * Cache childOperators in an array for faster access. childOperatorsArray is
+ * accessed per row, so it's important to make the access efficient.
*/
transient protected Operator<? extends Serializable>[] childOperatorsArray = null;
transient protected int[] childOperatorsTag;
@@ -513,54 +557,69 @@
transient private long cntr = 0;
transient private long nextCntr = 1;
- /**
- * Replace one child with another at the same position. The parent of the child is not changed
- * @param child the old child
- * @param newChild the new child
+ /**
+ * Replace one child with another at the same position. The parent of the
+ * child is not changed
+ *
+ * @param child
+ * the old child
+ * @param newChild
+ * the new child
*/
- public void replaceChild(Operator<? extends Serializable> child, Operator<? extends Serializable> newChild) {
+ public void replaceChild(Operator<? extends Serializable> child,
+ Operator<? extends Serializable> newChild) {
int childIndex = childOperators.indexOf(child);
assert childIndex != -1;
childOperators.set(childIndex, newChild);
}
- public void removeChild(Operator<? extends Serializable> child) {
+ public void removeChild(Operator<? extends Serializable> child) {
int childIndex = childOperators.indexOf(child);
assert childIndex != -1;
- if (childOperators.size() == 1)
+ if (childOperators.size() == 1) {
childOperators = null;
- else
+ } else {
childOperators.remove(childIndex);
+ }
int parentIndex = child.getParentOperators().indexOf(this);
assert parentIndex != -1;
- if (child.getParentOperators().size() == 1)
+ if (child.getParentOperators().size() == 1) {
child.setParentOperators(null);
- else
+ } else {
child.getParentOperators().remove(parentIndex);
+ }
}
/**
- * Replace one parent with another at the same position. Chilren of the new parent are not updated
- * @param parent the old parent
- * @param newParent the new parent
+ * Replace one parent with another at the same position. Chilren of the new
+ * parent are not updated
+ *
+ * @param parent
+ * the old parent
+ * @param newParent
+ * the new parent
*/
- public void replaceParent(Operator<? extends Serializable> parent, Operator<? extends Serializable> newParent) {
+ public void replaceParent(Operator<? extends Serializable> parent,
+ Operator<? extends Serializable> newParent) {
int parentIndex = parentOperators.indexOf(parent);
assert parentIndex != -1;
parentOperators.set(parentIndex, newParent);
}
private long getNextCntr(long cntr) {
- // A very simple counter to keep track of number of rows processed by an operator. It dumps
+ // A very simple counter to keep track of number of rows processed by an
+ // operator. It dumps
// every 1 million times, and quickly before that
- if (cntr >= 1000000)
+ if (cntr >= 1000000) {
return cntr + 1000000;
+ }
return 10 * cntr;
}
- protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
+ protected void forward(Object row, ObjectInspector rowInspector)
+ throws HiveException {
if ((++outputRows % 1000) == 0) {
if (counterNameToEnum != null) {
@@ -578,14 +637,17 @@
}
// For debugging purposes:
- // System.out.println("" + this.getClass() + ": " + SerDeUtils.getJSONString(row, rowInspector));
- // System.out.println("" + this.getClass() + ">> " + ObjectInspectorUtils.getObjectInspectorName(rowInspector));
+ // System.out.println("" + this.getClass() + ": " +
+ // SerDeUtils.getJSONString(row, rowInspector));
+ // System.out.println("" + this.getClass() + ">> " +
+ // ObjectInspectorUtils.getObjectInspectorName(rowInspector));
if (childOperatorsArray == null && childOperators != null) {
- throw new HiveException("Internal Hive error during operator initialization.");
+ throw new HiveException(
+ "Internal Hive error during operator initialization.");
}
- if((childOperatorsArray == null) || (getDone())) {
+ if ((childOperatorsArray == null) || (getDone())) {
return;
}
@@ -593,7 +655,7 @@
for (int i = 0; i < childOperatorsArray.length; i++) {
Operator<? extends Serializable> o = childOperatorsArray[i];
if (o.getDone()) {
- childrenDone ++;
+ childrenDone++;
} else {
o.process(row, childOperatorsTag[i]);
}
@@ -606,7 +668,7 @@
}
public void resetStats() {
- for(Enum<?> e: statsMap.keySet()) {
+ for (Enum<?> e : statsMap.keySet()) {
statsMap.get(e).set(0L);
}
}
@@ -615,23 +677,24 @@
public void func(Operator<? extends Serializable> op);
}
- public void preorderMap (OperatorFunc opFunc) {
+ public void preorderMap(OperatorFunc opFunc) {
opFunc.func(this);
- if(childOperators != null) {
- for(Operator<? extends Serializable> o: childOperators) {
+ if (childOperators != null) {
+ for (Operator<? extends Serializable> o : childOperators) {
o.preorderMap(opFunc);
}
}
}
- public void logStats () {
- for(Enum<?> e: statsMap.keySet()) {
+ public void logStats() {
+ for (Enum<?> e : statsMap.keySet()) {
LOG.info(e.toString() + ":" + statsMap.get(e).toString());
}
}
/**
* Implements the getName function for the Node Interface.
+ *
* @return the name of the operator
*/
public String getName() {
@@ -639,8 +702,9 @@
}
/**
- * Returns a map of output column name to input expression map
- * Note that currently it returns only key columns for ReduceSink and GroupBy operators
+ * Returns a map of output column name to input expression map Note that
+ * currently it returns only key columns for ReduceSink and GroupBy operators
+ *
* @return null if the operator doesn't change columns
*/
public Map<String, exprNodeDesc> getColumnExprMap() {
@@ -657,7 +721,7 @@
}
StringBuilder s = new StringBuilder();
s.append("\n");
- while(level > 0) {
+ while (level > 0) {
s.append(" ");
level--;
}
@@ -669,8 +733,9 @@
}
public String dump(int level, HashSet<Integer> seenOpts) {
- if ( seenOpts.contains(new Integer(id)))
+ if (seenOpts.contains(new Integer(id))) {
return null;
+ }
seenOpts.add(new Integer(id));
StringBuilder s = new StringBuilder();
@@ -683,7 +748,7 @@
s.append(ls);
s.append(" <Children>");
for (Operator<? extends Serializable> o : childOperators) {
- s.append(o.dump(level+2, seenOpts));
+ s.append(o.dump(level + 2, seenOpts));
}
s.append(ls);
s.append(" <\\Children>");
@@ -694,7 +759,7 @@
s.append(" <Parent>");
for (Operator<? extends Serializable> o : parentOperators) {
s.append("Id = " + o.id + " ");
- s.append(o.dump(level,seenOpts));
+ s.append(o.dump(level, seenOpts));
}
s.append("<\\Parent>");
}
@@ -711,7 +776,7 @@
protected static ObjectInspector[] initEvaluators(ExprNodeEvaluator[] evals,
ObjectInspector rowInspector) throws HiveException {
ObjectInspector[] result = new ObjectInspector[evals.length];
- for (int i=0; i<evals.length; i++) {
+ for (int i = 0; i < evals.length; i++) {
result[i] = evals[i].initialize(rowInspector);
}
return result;
@@ -722,12 +787,12 @@
* StructObjectInspector with integer field names.
*/
protected static StructObjectInspector initEvaluatorsAndReturnStruct(
- ExprNodeEvaluator[] evals, List<String> outputColName, ObjectInspector rowInspector)
- throws HiveException {
- ObjectInspector[] fieldObjectInspectors = initEvaluators(evals, rowInspector);
+ ExprNodeEvaluator[] evals, List<String> outputColName,
+ ObjectInspector rowInspector) throws HiveException {
+ ObjectInspector[] fieldObjectInspectors = initEvaluators(evals,
+ rowInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(
- outputColName,
- Arrays.asList(fieldObjectInspectors));
+ outputColName, Arrays.asList(fieldObjectInspectors));
}
/**
@@ -738,46 +803,7 @@
* TODO This is a hack for hadoop 0.17 which only supports enum counters
*/
public static enum ProgressCounter {
- C1, C2, C3, C4, C5, C6, C7, C8, C9, C10,
- C11, C12, C13, C14, C15, C16, C17, C18, C19, C20,
- C21, C22, C23, C24, C25, C26, C27, C28, C29, C30,
- C31, C32, C33, C34, C35, C36, C37, C38, C39, C40,
- C41, C42, C43, C44, C45, C46, C47, C48, C49, C50,
- C51, C52, C53, C54, C55, C56, C57, C58, C59, C60,
- C61, C62, C63, C64, C65, C66, C67, C68, C69, C70,
- C71, C72, C73, C74, C75, C76, C77, C78, C79, C80,
- C81, C82, C83, C84, C85, C86, C87, C88, C89, C90,
- C91, C92, C93, C94, C95, C96, C97, C98, C99, C100,
- C101, C102, C103, C104, C105, C106, C107, C108, C109, C110,
- C111, C112, C113, C114, C115, C116, C117, C118, C119, C120,
- C121, C122, C123, C124, C125, C126, C127, C128, C129, C130,
- C131, C132, C133, C134, C135, C136, C137, C138, C139, C140,
- C141, C142, C143, C144, C145, C146, C147, C148, C149, C150,
- C151, C152, C153, C154, C155, C156, C157, C158, C159, C160,
- C161, C162, C163, C164, C165, C166, C167, C168, C169, C170,
- C171, C172, C173, C174, C175, C176, C177, C178, C179, C180,
- C181, C182, C183, C184, C185, C186, C187, C188, C189, C190,
- C191, C192, C193, C194, C195, C196, C197, C198, C199, C200,
- C201, C202, C203, C204, C205, C206, C207, C208, C209, C210,
- C211, C212, C213, C214, C215, C216, C217, C218, C219, C220,
- C221, C222, C223, C224, C225, C226, C227, C228, C229, C230,
- C231, C232, C233, C234, C235, C236, C237, C238, C239, C240,
- C241, C242, C243, C244, C245, C246, C247, C248, C249, C250,
- C251, C252, C253, C254, C255, C256, C257, C258, C259, C260,
- C261, C262, C263, C264, C265, C266, C267, C268, C269, C270,
- C271, C272, C273, C274, C275, C276, C277, C278, C279, C280,
- C281, C282, C283, C284, C285, C286, C287, C288, C289, C290,
- C291, C292, C293, C294, C295, C296, C297, C298, C299, C300,
- C301, C302, C303, C304, C305, C306, C307, C308, C309, C310,
- C311, C312, C313, C314, C315, C316, C317, C318, C319, C320,
- C321, C322, C323, C324, C325, C326, C327, C328, C329, C330,
- C331, C332, C333, C334, C335, C336, C337, C338, C339, C340,
- C341, C342, C343, C344, C345, C346, C347, C348, C349, C350,
- C351, C352, C353, C354, C355, C356, C357, C358, C359, C360,
- C361, C362, C363, C364, C365, C366, C367, C368, C369, C370,
- C371, C372, C373, C374, C375, C376, C377, C378, C379, C380,
- C381, C382, C383, C384, C385, C386, C387, C388, C389, C390,
- C391, C392, C393, C394, C395, C396, C397, C398, C399, C400
+ C1, C2, C3, C4, C5, C6, C7, C8, C9, C10, C11, C12, C13, C14, C15, C16, C17, C18, C19, C20, C21, C22, C23, C24, C25, C26, C27, C28, C29, C30, C31, C32, C33, C34, C35, C36, C37, C38, C39, C40, C41, C42, C43, C44, C45, C46, C47, C48, C49, C50, C51, C52, C53, C54, C55, C56, C57, C58, C59, C60, C61, C62, C63, C64, C65, C66, C67, C68, C69, C70, C71, C72, C73, C74, C75, C76, C77, C78, C79, C80, C81, C82, C83, C84, C85, C86, C87, C88, C89, C90, C91, C92, C93, C94, C95, C96, C97, C98, C99, C100, C101, C102, C103, C104, C105, C106, C107, C108, C109, C110, C111, C112, C113, C114, C115, C116, C117, C118, C119, C120, C121, C122, C123, C124, C125, C126, C127, C128, C129, C130, C131, C132, C133, C134, C135, C136, C137, C138, C139, C140, C141, C142, C143, C144, C145, C146, C147, C148, C149, C150, C151, C152, C153, C154, C155, C156, C157, C158, C159, C160, C161, C162, C163, C164, C165, C166, C167, C168, C169, C170, C171, C172, C173, C174, C175, C176, C177, C178, C179, C180, C181, C182, C
183, C184, C185, C186, C187, C188, C189, C190, C191, C192, C193, C194, C195, C196, C197, C198, C199, C200, C201, C202, C203, C204, C205, C206, C207, C208, C209, C210, C211, C212, C213, C214, C215, C216, C217, C218, C219, C220, C221, C222, C223, C224, C225, C226, C227, C228, C229, C230, C231, C232, C233, C234, C235, C236, C237, C238, C239, C240, C241, C242, C243, C244, C245, C246, C247, C248, C249, C250, C251, C252, C253, C254, C255, C256, C257, C258, C259, C260, C261, C262, C263, C264, C265, C266, C267, C268, C269, C270, C271, C272, C273, C274, C275, C276, C277, C278, C279, C280, C281, C282, C283, C284, C285, C286, C287, C288, C289, C290, C291, C292, C293, C294, C295, C296, C297, C298, C299, C300, C301, C302, C303, C304, C305, C306, C307, C308, C309, C310, C311, C312, C313, C314, C315, C316, C317, C318, C319, C320, C321, C322, C323, C324, C325, C326, C327, C328, C329, C330, C331, C332, C333, C334, C335, C336, C337, C338, C339, C340, C341, C342, C343, C344, C345, C346, C347,
C348, C349, C350, C351, C352, C353, C354, C355, C356, C357, C358, C359, C360, C361, C362, C363, C364, C365, C366, C367, C368, C369, C370, C371, C372, C373, C374, C375, C376, C377, C378, C379, C380, C381, C382, C383, C384, C385, C386, C387, C388, C389, C390, C391, C392, C393, C394, C395, C396, C397, C398, C399, C400
};
private static int totalNumCntrs = 400;
@@ -788,9 +814,8 @@
transient protected Map<String, Long> counters;
/**
- * keeps track of unique ProgressCounter enums used
- * this value is used at compile time while assigning ProgressCounter
- * enums to counter names
+ * keeps track of unique ProgressCounter enums used this value is used at
+ * compile time while assigning ProgressCounter enums to counter names
*/
private static int lastEnumUsed;
@@ -804,15 +829,14 @@
/**
* this is called before operator process to buffer some counters
*/
- private void preProcessCounter()
- {
+ private void preProcessCounter() {
inputRows++;
if (counterNameToEnum != null) {
if ((inputRows % 1000) == 0) {
incrCounter(numInputRowsCntr, inputRows);
incrCounter(timeTakenCntr, totalTime);
- inputRows = 0 ;
+ inputRows = 0;
totalTime = 0;
}
beginTime = System.currentTimeMillis();
@@ -822,28 +846,31 @@
/**
* this is called after operator process to buffer some counters
*/
- private void postProcessCounter()
- {
- if (counterNameToEnum != null)
+ private void postProcessCounter() {
+ if (counterNameToEnum != null) {
totalTime += (System.currentTimeMillis() - beginTime);
+ }
}
-
/**
* this is called in operators in map or reduce tasks
+ *
* @param name
* @param amount
*/
- protected void incrCounter(String name, long amount)
- {
+ protected void incrCounter(String name, long amount) {
String counterName = "CNTR_NAME_" + getOperatorId() + "_" + name;
ProgressCounter pc = counterNameToEnum.get(counterName);
- // Currently, we maintain fixed number of counters per plan - in case of a bigger tree, we may run out of them
- if (pc == null)
- LOG.warn("Using too many counters. Increase the total number of counters for " + counterName);
- else if (reporter != null)
+ // Currently, we maintain fixed number of counters per plan - in case of a
+ // bigger tree, we may run out of them
+ if (pc == null) {
+ LOG
+ .warn("Using too many counters. Increase the total number of counters for "
+ + counterName);
+ } else if (reporter != null) {
reporter.incrCounter(pc, amount);
+ }
}
public ArrayList<String> getCounterNames() {
@@ -872,7 +899,9 @@
/**
* called in ExecDriver.progress periodically
- * @param ctrs counters from the running job
+ *
+ * @param ctrs
+ * counters from the running job
*/
@SuppressWarnings("unchecked")
public void updateCounters(Counters ctrs) {
@@ -880,85 +909,101 @@
counters = new HashMap<String, Long>();
}
- // For some old unit tests, the counters will not be populated. Eventually, the old tests should be removed
- if (counterNameToEnum == null)
+ // For some old unit tests, the counters will not be populated. Eventually,
+ // the old tests should be removed
+ if (counterNameToEnum == null) {
return;
+ }
- for (Map.Entry<String, ProgressCounter> counter: counterNameToEnum.entrySet()) {
+ for (Map.Entry<String, ProgressCounter> counter : counterNameToEnum
+ .entrySet()) {
counters.put(counter.getKey(), ctrs.getCounter(counter.getValue()));
}
// update counters of child operators
// this wont be an infinite loop since the operator graph is acyclic
// but, some operators may be updated more than once and that's ok
if (getChildren() != null) {
- for (Node op: getChildren()) {
- ((Operator<? extends Serializable>)op).updateCounters(ctrs);
+ for (Node op : getChildren()) {
+ ((Operator<? extends Serializable>) op).updateCounters(ctrs);
}
}
}
/**
- * Recursively check this operator and its descendants to see if the
- * fatal error counter is set to non-zero.
+ * Recursively check this operator and its descendants to see if the fatal
+ * error counter is set to non-zero.
+ *
* @param ctrs
*/
public boolean checkFatalErrors(Counters ctrs, StringBuffer errMsg) {
- if ( counterNameToEnum == null )
+ if (counterNameToEnum == null) {
return false;
-
+ }
+
String counterName = "CNTR_NAME_" + getOperatorId() + "_" + fatalErrorCntr;
ProgressCounter pc = counterNameToEnum.get(counterName);
- // Currently, we maintain fixed number of counters per plan - in case of a bigger tree, we may run out of them
- if (pc == null)
- LOG.warn("Using too many counters. Increase the total number of counters for " + counterName);
- else {
+ // Currently, we maintain fixed number of counters per plan - in case of a
+ // bigger tree, we may run out of them
+ if (pc == null) {
+ LOG
+ .warn("Using too many counters. Increase the total number of counters for "
+ + counterName);
+ } else {
long value = ctrs.getCounter(pc);
fatalErrorMessage(errMsg, value);
- if ( value != 0 )
+ if (value != 0) {
return true;
+ }
}
-
+
if (getChildren() != null) {
- for (Node op: getChildren()) {
- if (((Operator<? extends Serializable>)op).checkFatalErrors(ctrs, errMsg)) {
+ for (Node op : getChildren()) {
+ if (((Operator<? extends Serializable>) op).checkFatalErrors(ctrs,
+ errMsg)) {
return true;
}
}
}
return false;
}
-
- /**
+
+ /**
* Get the fatal error message based on counter's code.
- * @param errMsg error message should be appended to this output parameter.
- * @param counterValue input counter code.
+ *
+ * @param errMsg
+ * error message should be appended to this output parameter.
+ * @param counterValue
+ * input counter code.
*/
protected void fatalErrorMessage(StringBuffer errMsg, long counterValue) {
}
-
+
// A given query can have multiple map-reduce jobs
public static void resetLastEnumUsed() {
lastEnumUsed = 0;
}
/**
- * Called only in SemanticAnalyzer after all operators have added their
- * own set of counter names
+ * Called only in SemanticAnalyzer after all operators have added their own
+ * set of counter names
*/
public void assignCounterNameToEnum() {
if (counterNameToEnum != null) {
return;
}
counterNameToEnum = new HashMap<String, ProgressCounter>();
- for (String counterName: getCounterNames()) {
+ for (String counterName : getCounterNames()) {
++lastEnumUsed;
// TODO Hack for hadoop-0.17
- // Currently, only maximum number of 'totalNumCntrs' can be used. If you want
- // to add more counters, increase the number of counters in ProgressCounter
+ // Currently, only maximum number of 'totalNumCntrs' can be used. If you
+ // want
+ // to add more counters, increase the number of counters in
+ // ProgressCounter
if (lastEnumUsed > totalNumCntrs) {
- LOG.warn("Using too many counters. Increase the total number of counters");
+ LOG
+ .warn("Using too many counters. Increase the total number of counters");
return;
}
String enumName = "C" + lastEnumUsed;
@@ -967,10 +1012,10 @@
}
}
- protected static String numInputRowsCntr = "NUM_INPUT_ROWS";
+ protected static String numInputRowsCntr = "NUM_INPUT_ROWS";
protected static String numOutputRowsCntr = "NUM_OUTPUT_ROWS";
- protected static String timeTakenCntr = "TIME_TAKEN";
- protected static String fatalErrorCntr = "FATAL_ERROR";
+ protected static String timeTakenCntr = "TIME_TAKEN";
+ protected static String fatalErrorCntr = "FATAL_ERROR";
public void initializeCounters() {
initOperatorId();
@@ -986,32 +1031,32 @@
}
/*
- * By default, the list is empty - if an operator wants to add more counters, it should override this method
- * and provide the new list.
-
+ * By default, the list is empty - if an operator wants to add more counters,
+ * it should override this method and provide the new list.
*/
private List<String> getAdditionalCounters() {
return null;
}
-
+
public HashMap<String, ProgressCounter> getCounterNameToEnum() {
return counterNameToEnum;
}
- public void setCounterNameToEnum(HashMap<String, ProgressCounter> counterNameToEnum) {
+ public void setCounterNameToEnum(
+ HashMap<String, ProgressCounter> counterNameToEnum) {
this.counterNameToEnum = counterNameToEnum;
}
- /**
- * Should be overridden to return the type of the specific operator among
- * the types in OperatorType
- *
- * @return OperatorType.* or -1 if not overridden
- */
- public int getType() {
- assert false;
- return -1;
- }
+ /**
+ * Should be overridden to return the type of the specific operator among the
+ * types in OperatorType
+ *
+ * @return OperatorType.* or -1 if not overridden
+ */
+ public int getType() {
+ assert false;
+ return -1;
+ }
public void setGroupKeyObject(Object keyObject) {
this.groupKeyObject = keyObject;