You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2010/01/21 11:38:15 UTC
svn commit: r901644 [11/37] - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/history/ ql/src/jav...
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FlatFileInputFormat.java Thu Jan 21 10:37:58 2010
@@ -18,82 +18,85 @@
package org.apache.hadoop.hive.ql.io;
-import java.io.IOException;
+import java.io.DataInputStream;
import java.io.EOFException;
+import java.io.IOException;
import java.io.InputStream;
-import java.io.DataInputStream;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
-
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configurable;
-
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Deserializer;
-
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.ReflectionUtils;
-/** An {@link org.apache.hadoop.mapred.InputFormat} for Plain files with {@link Deserializer} records */
-public class FlatFileInputFormat<T> extends FileInputFormat<Void, FlatFileInputFormat.RowContainer<T>> {
+/**
+ * An {@link org.apache.hadoop.mapred.InputFormat} for Plain files with
+ * {@link Deserializer} records
+ */
+public class FlatFileInputFormat<T> extends
+ FileInputFormat<Void, FlatFileInputFormat.RowContainer<T>> {
/**
- * A work-around until HADOOP-1230 is fixed.
- *
- * Allows boolean next(k,v) to be called by reference but still allow the deserializer to create a new
- * object (i.e., row) on every call to next.
+ * A work-around until HADOOP-1230 is fixed.
+ *
+ * Allows boolean next(k,v) to be called by reference but still allow the
+ * deserializer to create a new object (i.e., row) on every call to next.
*/
static public class RowContainer<T> {
T row;
}
/**
- * An implementation of SerializationContext is responsible for looking up the Serialization implementation
- * for the given RecordReader. Potentially based on the Configuration or some other mechanism
- *
- * The SerializationFactory does not give this functionality since:
- * 1. Requires Serialization implementations to be specified in the Configuration a-priori (although same as setting
- * a SerializationContext)
- * 2. Does not lookup the actual subclass being deserialized. e.g., for Serializable does not have a way of configuring
- * the actual Java class being serialized/deserialized.
+ * An implementation of SerializationContext is responsible for looking up the
+ * Serialization implementation for the given RecordReader. Potentially based
+ * on the Configuration or some other mechanism
+ *
+ * The SerializationFactory does not give this functionality since: 1.
+ * Requires Serialization implementations to be specified in the Configuration
+ * a-priori (although same as setting a SerializationContext) 2. Does not
+ * lookup the actual subclass being deserialized. e.g., for Serializable does
+ * not have a way of configuring the actual Java class being
+ * serialized/deserialized.
*/
static public interface SerializationContext<S> extends Configurable {
/**
- * An {@link Serialization} object for objects of type S
+ * An {@link Serialization} object for objects of type S
+ *
* @return a serialization object for this context
*/
public Serialization<S> getSerialization() throws IOException;
/**
- * Produces the specific class to deserialize
+ * Produces the specific class to deserialize
*/
public Class<? extends S> getRealClass() throws IOException;
}
-
+
/**
* The JobConf keys for the Serialization implementation
*/
static public final String SerializationImplKey = "mapred.input.serialization.implKey";
/**
- * An implementation of {@link SerializationContext} that reads the Serialization class and
- * specific subclass to be deserialized from the JobConf.
- *
+ * An implementation of {@link SerializationContext} that reads the
+ * Serialization class and specific subclass to be deserialized from the
+ * JobConf.
+ *
*/
- static public class SerializationContextFromConf<S> implements FlatFileInputFormat.SerializationContext<S> {
+ static public class SerializationContextFromConf<S> implements
+ FlatFileInputFormat.SerializationContext<S> {
/**
* The JobConf keys for the Class that is being deserialized.
@@ -101,57 +104,67 @@
static public final String SerializationSubclassKey = "mapred.input.serialization.subclassKey";
/**
- * Implements configurable so it can use the configuration to find the right classes
- * Note: ReflectionUtils will automatigically call setConf with the right configuration.
+ * Implements configurable so it can use the configuration to find the right
+ * classes Note: ReflectionUtils will automatigically call setConf with the
+ * right configuration.
*/
private Configuration conf;
- public void setConf(Configuration conf) {
- this.conf = conf;
+ public void setConf(Configuration conf) {
+ this.conf = conf;
}
- public Configuration getConf() {
- return conf;
+ public Configuration getConf() {
+ return conf;
}
/**
* @return the actual class being deserialized
- * @exception does not currently throw IOException
+ * @exception does
+ * not currently throw IOException
*/
public Class<S> getRealClass() throws IOException {
- return (Class<S>)conf.getClass(SerializationSubclassKey, null, Object.class);
+ return (Class<S>) conf.getClass(SerializationSubclassKey, null,
+ Object.class);
}
/**
* Looks up and instantiates the Serialization Object
- *
- * Important to note here that we are not relying on the Hadoop SerializationFactory part of the
- * Serialization framework. This is because in the case of Non-Writable Objects, we cannot make any
- * assumptions about the uniformity of the serialization class APIs - i.e., there may not be a "write"
- * method call and a subclass may need to implement its own Serialization classes.
- * The SerializationFactory currently returns the first (de)serializer that is compatible
- * with the class to be deserialized; in this context, that assumption isn't necessarily true.
- *
+ *
+ * Important to note here that we are not relying on the Hadoop
+ * SerializationFactory part of the Serialization framework. This is because
+ * in the case of Non-Writable Objects, we cannot make any assumptions about
+ * the uniformity of the serialization class APIs - i.e., there may not be a
+ * "write" method call and a subclass may need to implement its own
+ * Serialization classes. The SerializationFactory currently returns the
+ * first (de)serializer that is compatible with the class to be
+ * deserialized; in this context, that assumption isn't necessarily true.
+ *
* @return the serialization object for this context
- * @exception does not currently throw any IOException
+ * @exception does
+ * not currently throw any IOException
*/
public Serialization<S> getSerialization() throws IOException {
- Class<Serialization<S>> tClass = (Class<Serialization<S>>)conf.getClass(SerializationImplKey, null, Serialization.class);
- return tClass == null ? null : (Serialization<S>)ReflectionUtils.newInstance(tClass, conf);
+ Class<Serialization<S>> tClass = (Class<Serialization<S>>) conf.getClass(
+ SerializationImplKey, null, Serialization.class);
+ return tClass == null ? null : (Serialization<S>) ReflectionUtils
+ .newInstance(tClass, conf);
}
}
- /**
- * An {@link RecordReader} for plain files with {@link Deserializer} records
- *
- * Reads one row at a time of type R.
- * R is intended to be a base class of something such as: Record, Writable, Text, ...
- *
+ /**
+ * An {@link RecordReader} for plain files with {@link Deserializer} records
+ *
+ * Reads one row at a time of type R. R is intended to be a base class of
+ * something such as: Record, Writable, Text, ...
+ *
*/
- public class FlatFileRecordReader<R> implements RecordReader<Void, FlatFileInputFormat.RowContainer<R>> {
+ public class FlatFileRecordReader<R> implements
+ RecordReader<Void, FlatFileInputFormat.RowContainer<R>> {
/**
- * An interface for a helper class for instantiating {@link Serialization} classes.
+ * An interface for a helper class for instantiating {@link Serialization}
+ * classes.
*/
/**
* The stream in use - is fsin if not compressed, otherwise, it is dcin.
@@ -179,33 +192,37 @@
private final Deserializer<R> deserializer;
/**
- * Once EOF is reached, stop calling the deserializer
+ * Once EOF is reached, stop calling the deserializer
*/
private boolean isEOF;
/**
- * The JobConf which contains information needed to instantiate the correct Deserializer
+ * The JobConf which contains information needed to instantiate the correct
+ * Deserializer
*/
- private Configuration conf;
+ private final Configuration conf;
/**
- * The actual class of the row's we are deserializing, not just the base class
+ * The actual class of the row's we are deserializing, not just the base
+ * class
*/
- private Class<R> realRowClass;
-
+ private final Class<R> realRowClass;
/**
- * FlatFileRecordReader constructor constructs the underlying stream (potentially decompressed) and
- * creates the deserializer.
- *
- * @param conf the jobconf
- * @param split the split for this file
+ * FlatFileRecordReader constructor constructs the underlying stream
+ * (potentially decompressed) and creates the deserializer.
+ *
+ * @param conf
+ * the jobconf
+ * @param split
+ * the split for this file
*/
- public FlatFileRecordReader(Configuration conf,
- FileSplit split) throws IOException {
+ public FlatFileRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
final Path path = split.getPath();
FileSystem fileSys = path.getFileSystem(conf);
- CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
+ CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(
+ conf);
final CompressionCodec codec = compressionCodecs.getCodec(path);
this.conf = conf;
@@ -221,28 +238,26 @@
isEOF = false;
end = split.getLength();
- // Instantiate a SerializationContext which this will use to lookup the Serialization class and the
+ // Instantiate a SerializationContext which this will use to lookup the
+ // Serialization class and the
// actual class being deserialized
SerializationContext<R> sinfo;
- Class<SerializationContext<R>> sinfoClass =
- (Class<SerializationContext<R>>)conf.getClass(SerializationContextImplKey, SerializationContextFromConf.class);
+ Class<SerializationContext<R>> sinfoClass = (Class<SerializationContext<R>>) conf
+ .getClass(SerializationContextImplKey,
+ SerializationContextFromConf.class);
- sinfo = (SerializationContext<R>)ReflectionUtils.newInstance(sinfoClass, conf);
+ sinfo = (SerializationContext<R>) ReflectionUtils.newInstance(sinfoClass,
+ conf);
// Get the Serialization object and the class being deserialized
Serialization<R> serialization = sinfo.getSerialization();
- realRowClass = (Class<R>)sinfo.getRealClass();
+ realRowClass = (Class<R>) sinfo.getRealClass();
- deserializer = (Deserializer<R>)serialization.getDeserializer((Class<R>)realRowClass);
+ deserializer = serialization.getDeserializer(realRowClass);
deserializer.open(in);
}
/**
- * The actual class of the data being deserialized
- */
- private Class<R> realRowclass;
-
- /**
* The JobConf key of the SerializationContext to use
*/
static public final String SerializationContextImplKey = "mapred.input.serialization.context_impl";
@@ -250,34 +265,41 @@
/**
* @return null
*/
- public Void createKey() {
+ public Void createKey() {
return null;
}
/**
* @return a new R instance.
*/
- public RowContainer<R> createValue() {
+ public RowContainer<R> createValue() {
RowContainer<R> r = new RowContainer<R>();
- r.row = (R)ReflectionUtils.newInstance(realRowClass, conf);
+ r.row = (R) ReflectionUtils.newInstance(realRowClass, conf);
return r;
}
/**
* Returns the next row # and value
- *
- * @param key - void as these files have a value only
- * @param value - the row container which is always re-used, but the internal value may be set to a new Object
- * @return whether the key and value were read. True if they were and false if EOF
- * @exception IOException from the deserializer
- */
- public synchronized boolean next(Void key, RowContainer<R> value) throws IOException {
- if(isEOF || in.available() == 0) {
+ *
+ * @param key
+ * - void as these files have a value only
+ * @param value
+ * - the row container which is always re-used, but the internal
+ * value may be set to a new Object
+ * @return whether the key and value were read. True if they were and false
+ * if EOF
+ * @exception IOException
+ * from the deserializer
+ */
+ public synchronized boolean next(Void key, RowContainer<R> value)
+ throws IOException {
+ if (isEOF || in.available() == 0) {
isEOF = true;
return false;
}
- // the deserializer is responsible for actually reading each record from the stream
+ // the deserializer is responsible for actually reading each record from
+ // the stream
try {
value.row = deserializer.deserialize(value.row);
if (value.row == null) {
@@ -285,27 +307,27 @@
return false;
}
return true;
- } catch(EOFException e) {
+ } catch (EOFException e) {
isEOF = true;
return false;
}
}
public synchronized float getProgress() throws IOException {
- // this assumes no splitting
+ // this assumes no splitting
if (end == 0) {
return 0.0f;
} else {
- // gives progress over uncompressed stream
+ // gives progress over uncompressed stream
// assumes deserializer is not buffering itself
- return Math.min(1.0f, fsin.getPos()/(float)(end));
+ return Math.min(1.0f, fsin.getPos() / (float) (end));
}
}
public synchronized long getPos() throws IOException {
// assumes deserializer is not buffering itself
- // position over uncompressed stream. not sure what
- // effect this has on stats about job
+ // position over uncompressed stream. not sure what
+ // effect this has on stats about job
return fsin.getPos();
}
@@ -319,9 +341,9 @@
return false;
}
+ @Override
public RecordReader<Void, RowContainer<T>> getRecordReader(InputSplit split,
- JobConf job, Reporter reporter)
- throws IOException {
+ JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Thu Jan 21 10:37:58 2010
@@ -25,7 +25,6 @@
import java.util.Properties;
import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -45,14 +44,12 @@
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.util.StringUtils;
/**
* An util class for various Hive file format tasks.
- * registerOutputFormatSubstitute(Class, Class)
- * getOutputFormatSubstitute(Class) are added for backward
- * compatibility. They return the newly added HiveOutputFormat for the older
- * ones.
+ * registerOutputFormatSubstitute(Class, Class) getOutputFormatSubstitute(Class)
+ * are added for backward compatibility. They return the newly added
+ * HiveOutputFormat for the older ones.
*
*/
public class HiveFileFormatUtils {
@@ -67,7 +64,7 @@
@SuppressWarnings("unchecked")
private static Map<Class<? extends OutputFormat>, Class<? extends HiveOutputFormat>> outputFormatSubstituteMap;
-
+
/**
* register a substitute
*
@@ -88,8 +85,9 @@
@SuppressWarnings("unchecked")
public synchronized static Class<? extends HiveOutputFormat> getOutputFormatSubstitute(
Class<?> origin) {
- if (HiveOutputFormat.class.isAssignableFrom(origin))
+ if (HiveOutputFormat.class.isAssignableFrom(origin)) {
return (Class<? extends HiveOutputFormat>) origin;
+ }
Class<? extends HiveOutputFormat> result = outputFormatSubstituteMap
.get(origin);
return result;
@@ -112,11 +110,13 @@
}
return defaultFinalPath;
}
-
+
static {
inputFormatCheckerMap = new HashMap<Class<? extends InputFormat>, Class<? extends InputFormatChecker>>();
- HiveFileFormatUtils.registerInputFormatChecker(SequenceFileInputFormat.class, SequenceFileInputFormatChecker.class);
- HiveFileFormatUtils.registerInputFormatChecker(RCFileInputFormat.class, RCFileInputFormat.class);
+ HiveFileFormatUtils.registerInputFormatChecker(
+ SequenceFileInputFormat.class, SequenceFileInputFormatChecker.class);
+ HiveFileFormatUtils.registerInputFormatChecker(RCFileInputFormat.class,
+ RCFileInputFormat.class);
inputFormatCheckerInstanceCache = new HashMap<Class<? extends InputFormatChecker>, InputFormatChecker>();
}
@@ -139,12 +139,13 @@
inputFormatCheckerMap.put(format, checker);
}
- /**
+ /**
* get an InputFormatChecker for a file format.
*/
public synchronized static Class<? extends InputFormatChecker> getInputFormatChecker(
Class<?> inputFormat) {
- Class<? extends InputFormatChecker> result = inputFormatCheckerMap.get(inputFormat);
+ Class<? extends InputFormatChecker> result = inputFormatCheckerMap
+ .get(inputFormat);
return result;
}
@@ -157,14 +158,15 @@
throws HiveException {
if (files.size() > 0) {
Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls);
- if(checkerCls==null && inputFormatCls.isAssignableFrom(TextInputFormat.class)) {
+ if (checkerCls == null
+ && inputFormatCls.isAssignableFrom(TextInputFormat.class)) {
// we get a text input format here, we can not determine a file is text
// according to its content, so we can do is to test if other file
// format can accept it. If one other file format can accept this file,
// we treat this file as text file, although it maybe not.
- return checkTextInputFormat(fs, conf, files);
+ return checkTextInputFormat(fs, conf, files);
}
-
+
if (checkerCls != null) {
InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache
.get(checkerCls);
@@ -190,25 +192,27 @@
.keySet();
for (Class<? extends InputFormat> reg : inputFormatter) {
boolean result = checkInputFormat(fs, conf, reg, files);
- if (result)
+ if (result) {
return false;
+ }
}
return true;
}
-
-
+
public static RecordWriter getHiveRecordWriter(JobConf jc,
tableDesc tableInfo, Class<? extends Writable> outputClass,
fileSinkDesc conf, Path outPath) throws HiveException {
try {
- HiveOutputFormat<?, ?> hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance();
+ HiveOutputFormat<?, ?> hiveOutputFormat = tableInfo
+ .getOutputFileFormatClass().newInstance();
boolean isCompressed = conf.getCompressed();
JobConf jc_output = jc;
if (isCompressed) {
jc_output = new JobConf(jc);
String codecStr = conf.getCompressCodec();
if (codecStr != null && !codecStr.trim().equals("")) {
- Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class.forName(codecStr);
+ Class<? extends CompressionCodec> codec = (Class<? extends CompressionCodec>) Class
+ .forName(codecStr);
FileOutputFormat.setOutputCompressorClass(jc_output, codec);
}
String type = conf.getCompressType();
@@ -234,5 +238,5 @@
}
return null;
}
-
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java Thu Jan 21 10:37:58 2010
@@ -77,8 +77,8 @@
final int finalRowSeparator = rowSeparator;
FileSystem fs = outPath.getFileSystem(jc);
- final OutputStream outStream = Utilities.createCompressedStream(jc,
- fs.create(outPath), isCompressed);
+ final OutputStream outStream = Utilities.createCompressedStream(jc, fs
+ .create(outPath), isCompressed);
return new RecordWriter() {
public void write(Writable r) throws IOException {
if (r instanceof Text) {
@@ -102,7 +102,7 @@
protected static class IgnoreKeyWriter<K extends WritableComparable, V extends Writable>
implements org.apache.hadoop.mapred.RecordWriter<K, V> {
- private org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
+ private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
public IgnoreKeyWriter(org.apache.hadoop.mapred.RecordWriter<K, V> writer) {
this.mWriter = writer;
@@ -117,6 +117,7 @@
}
}
+ @Override
public org.apache.hadoop.mapred.RecordWriter<K, V> getRecordWriter(
FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Thu Jan 21 10:37:58 2010
@@ -22,9 +22,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
-import java.net.URLClassLoader;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -39,52 +37,52 @@
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.mapredWork;
-import org.apache.hadoop.hive.ql.plan.tableDesc;
import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-
/**
- * HiveInputFormat is a parameterized InputFormat which looks at the path name and determine
- * the correct InputFormat for that path name from mapredPlan.pathToPartitionInfo().
- * It can be used to read files with different input format in the same map-reduce job.
+ * HiveInputFormat is a parameterized InputFormat which looks at the path name
+ * and determine the correct InputFormat for that path name from
+ * mapredPlan.pathToPartitionInfo(). It can be used to read files with different
+ * input format in the same map-reduce job.
*/
-public class HiveInputFormat<K extends WritableComparable,
- V extends Writable> implements InputFormat<K, V>, JobConfigurable {
+public class HiveInputFormat<K extends WritableComparable, V extends Writable>
+ implements InputFormat<K, V>, JobConfigurable {
- public static final Log LOG =
- LogFactory.getLog("org.apache.hadoop.hive.ql.io.HiveInputFormat");
+ public static final Log LOG = LogFactory
+ .getLog("org.apache.hadoop.hive.ql.io.HiveInputFormat");
/**
- * HiveInputSplit encapsulates an InputSplit with its corresponding inputFormatClass.
- * The reason that it derives from FileSplit is to make sure "map.input.file" in MapTask.
+ * HiveInputSplit encapsulates an InputSplit with its corresponding
+ * inputFormatClass. The reason that it derives from FileSplit is to make sure
+ * "map.input.file" in MapTask.
*/
- public static class HiveInputSplit extends FileSplit implements InputSplit, Configurable {
+ public static class HiveInputSplit extends FileSplit implements InputSplit,
+ Configurable {
InputSplit inputSplit;
- String inputFormatClassName;
+ String inputFormatClassName;
public HiveInputSplit() {
// This is the only public constructor of FileSplit
- super((Path)null, 0, 0, (String[])null);
+ super((Path) null, 0, 0, (String[]) null);
}
public HiveInputSplit(InputSplit inputSplit, String inputFormatClassName) {
// This is the only public constructor of FileSplit
- super((Path)null, 0, 0, (String[])null);
+ super((Path) null, 0, 0, (String[]) null);
this.inputSplit = inputSplit;
this.inputFormatClassName = inputFormatClassName;
}
@@ -92,29 +90,34 @@
public InputSplit getInputSplit() {
return inputSplit;
}
+
public String inputFormatClassName() {
return inputFormatClassName;
}
+ @Override
public Path getPath() {
if (inputSplit instanceof FileSplit) {
- return ((FileSplit)inputSplit).getPath();
+ return ((FileSplit) inputSplit).getPath();
}
return new Path("");
}
/** The position of the first byte in the file to process. */
+ @Override
public long getStart() {
if (inputSplit instanceof FileSplit) {
- return ((FileSplit)inputSplit).getStart();
+ return ((FileSplit) inputSplit).getStart();
}
return 0;
}
+ @Override
public String toString() {
return inputFormatClassName + ":" + inputSplit.toString();
}
+ @Override
public long getLength() {
long r = 0;
try {
@@ -125,23 +128,27 @@
return r;
}
+ @Override
public String[] getLocations() throws IOException {
return inputSplit.getLocations();
}
+ @Override
public void readFields(DataInput in) throws IOException {
String inputSplitClassName = in.readUTF();
try {
- inputSplit = (InputSplit) ReflectionUtils.newInstance(
- conf.getClassByName(inputSplitClassName), conf);
+ inputSplit = (InputSplit) ReflectionUtils.newInstance(conf
+ .getClassByName(inputSplitClassName), conf);
} catch (Exception e) {
- throw new IOException("Cannot create an instance of InputSplit class = "
- + inputSplitClassName + ":" + e.getMessage());
+ throw new IOException(
+ "Cannot create an instance of InputSplit class = "
+ + inputSplitClassName + ":" + e.getMessage());
}
inputSplit.readFields(in);
inputFormatClassName = in.readUTF();
}
+ @Override
public void write(DataOutput out) throws IOException {
out.writeUTF(inputSplit.getClass().getName());
inputSplit.write(out);
@@ -149,7 +156,7 @@
}
Configuration conf;
-
+
@Override
public Configuration getConf() {
return conf;
@@ -170,19 +177,21 @@
/**
* A cache of InputFormat instances.
*/
- private static Map<Class,InputFormat<WritableComparable, Writable>> inputFormats;
- static InputFormat<WritableComparable, Writable> getInputFormatFromCache(Class inputFormatClass, JobConf job) throws IOException {
+ private static Map<Class, InputFormat<WritableComparable, Writable>> inputFormats;
+
+ static InputFormat<WritableComparable, Writable> getInputFormatFromCache(
+ Class inputFormatClass, JobConf job) throws IOException {
if (inputFormats == null) {
inputFormats = new HashMap<Class, InputFormat<WritableComparable, Writable>>();
}
if (!inputFormats.containsKey(inputFormatClass)) {
try {
- InputFormat<WritableComparable, Writable> newInstance =
- (InputFormat<WritableComparable, Writable>)ReflectionUtils.newInstance(inputFormatClass, job);
+ InputFormat<WritableComparable, Writable> newInstance = (InputFormat<WritableComparable, Writable>) ReflectionUtils
+ .newInstance(inputFormatClass, job);
inputFormats.put(inputFormatClass, newInstance);
} catch (Exception e) {
- throw new IOException("Cannot create an instance of InputFormat class " + inputFormatClass.getName()
- + " as specified in mapredWork!");
+ throw new IOException("Cannot create an instance of InputFormat class "
+ + inputFormatClass.getName() + " as specified in mapredWork!");
}
}
return inputFormats.get(inputFormatClass);
@@ -191,7 +200,7 @@
public RecordReader getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
- HiveInputSplit hsplit = (HiveInputSplit)split;
+ HiveInputSplit hsplit = (HiveInputSplit) split;
InputSplit inputSplit = hsplit.getInputSplit();
String inputFormatClassName = null;
@@ -203,14 +212,15 @@
throw new IOException("cannot find class " + inputFormatClassName);
}
- //clone a jobConf for setting needed columns for reading
+ // clone a jobConf for setting needed columns for reading
JobConf cloneJobConf = new JobConf(job);
- initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath().toString(),
- hsplit.getPath().toUri().getPath());
+ initColumnsNeeded(cloneJobConf, inputFormatClass, hsplit.getPath()
+ .toString(), hsplit.getPath().toUri().getPath());
- InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, cloneJobConf);
+ InputFormat inputFormat = getInputFormatFromCache(inputFormatClass,
+ cloneJobConf);
return new HiveRecordReader(inputFormat.getRecordReader(inputSplit,
- cloneJobConf, reporter));
+ cloneJobConf, reporter));
}
private Map<String, partitionDesc> pathToPartitionInfo;
@@ -233,16 +243,17 @@
ArrayList<InputSplit> result = new ArrayList<InputSplit>();
// for each dir, get the InputFormat, and do getSplits.
- for(Path dir: dirs) {
- partitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
- // create a new InputFormat instance if this is the first time to see this class
+ for (Path dir : dirs) {
+ partitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
+ // create a new InputFormat instance if this is the first time to see this
+ // class
Class inputFormatClass = part.getInputFileFormatClass();
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
FileInputFormat.setInputPaths(newjob, dir);
newjob.setInputFormat(inputFormat.getClass());
- InputSplit[] iss = inputFormat.getSplits(newjob, numSplits/dirs.length);
- for(InputSplit is: iss) {
+ InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
+ for (InputSplit is : iss) {
result.add(new HiveInputSplit(is, inputFormatClass.getName()));
}
}
@@ -260,10 +271,12 @@
JobConf newjob = new JobConf(job);
// for each dir, get the InputFormat, and do validateInput.
- for (Path dir: dirs) {
+ for (Path dir : dirs) {
partitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
- // create a new InputFormat instance if this is the first time to see this class
- InputFormat inputFormat = getInputFormatFromCache(part.getInputFileFormatClass(), job);
+ // create a new InputFormat instance if this is the first time to see this
+ // class
+ InputFormat inputFormat = getInputFormatFromCache(part
+ .getInputFileFormatClass(), job);
FileInputFormat.setInputPaths(newjob, dir);
newjob.setInputFormat(inputFormat.getClass());
@@ -271,47 +284,53 @@
}
}
- protected static partitionDesc getPartitionDescFromPath(Map<String, partitionDesc> pathToPartitionInfo,
- Path dir) throws IOException {
+ protected static partitionDesc getPartitionDescFromPath(
+ Map<String, partitionDesc> pathToPartitionInfo, Path dir)
+ throws IOException {
partitionDesc partDesc = pathToPartitionInfo.get(dir.toString());
if (partDesc == null) {
partDesc = pathToPartitionInfo.get(dir.toUri().getPath());
}
if (partDesc == null) {
- throw new IOException("cannot find dir = " + dir.toString() + " in partToPartitionInfo!");
+ throw new IOException("cannot find dir = " + dir.toString()
+ + " in partToPartitionInfo!");
}
return partDesc;
}
protected void initColumnsNeeded(JobConf jobConf, Class inputFormatClass,
- String splitPath, String splitPathWithNoSchema) {
- if (this.mrwork == null)
+ String splitPath, String splitPathWithNoSchema) {
+ if (this.mrwork == null) {
init(job);
+ }
ArrayList<String> aliases = new ArrayList<String>();
- Iterator<Entry<String, ArrayList<String>>> iterator =
- this.mrwork.getPathToAliases().entrySet().iterator();
+ Iterator<Entry<String, ArrayList<String>>> iterator = this.mrwork
+ .getPathToAliases().entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, ArrayList<String>> entry = iterator.next();
String key = entry.getKey();
if (splitPath.startsWith(key) || splitPathWithNoSchema.startsWith(key)) {
ArrayList<String> list = entry.getValue();
- for (String val : list)
+ for (String val : list) {
aliases.add(val);
+ }
}
}
for (String alias : aliases) {
- Operator<? extends Serializable> op = this.mrwork.getAliasToWork().get(alias);
+ Operator<? extends Serializable> op = this.mrwork.getAliasToWork().get(
+ alias);
if (op instanceof TableScanOperator) {
TableScanOperator tableScan = (TableScanOperator) op;
ArrayList<Integer> list = tableScan.getNeededColumnIDs();
- if (list != null)
- ColumnProjectionUtils.appendReadColumnIDs(jobConf, list);
- else
- ColumnProjectionUtils.setFullyReadColumns(jobConf);
+ if (list != null) {
+ ColumnProjectionUtils.appendReadColumnIDs(jobConf, list);
+ } else {
+ ColumnProjectionUtils.setFullyReadColumns(jobConf);
+ }
}
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Thu Jan 21 10:37:58 2010
@@ -21,46 +21,52 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparator;
-/** HiveKey is a simple wrapper on Text which allows us to set the hashCode easily.
- * hashCode is used for hadoop partitioner.
- */
+/**
+ * HiveKey is a simple wrapper on Text which allows us to set the hashCode
+ * easily. hashCode is used for hadoop partitioner.
+ */
public class HiveKey extends BytesWritable {
private static final int LENGTH_BYTES = 4;
-
+
boolean hashCodeValid;
+
public HiveKey() {
- hashCodeValid = false;
+ hashCodeValid = false;
}
-
- protected int myHashCode;
+
+ protected int myHashCode;
+
public void setHashCode(int myHashCode) {
- this.hashCodeValid = true;
+ hashCodeValid = true;
this.myHashCode = myHashCode;
}
+
+ @Override
public int hashCode() {
if (!hashCodeValid) {
- throw new RuntimeException("Cannot get hashCode() from deserialized " + HiveKey.class);
+ throw new RuntimeException("Cannot get hashCode() from deserialized "
+ + HiveKey.class);
}
return myHashCode;
}
- /** A Comparator optimized for HiveKey. */
+ /** A Comparator optimized for HiveKey. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(HiveKey.class);
}
-
+
/**
* Compare the buffers in serialized form.
*/
- public int compare(byte[] b1, int s1, int l1,
- byte[] b2, int s2, int l2) {
- return compareBytes(b1, s1+LENGTH_BYTES, l1-LENGTH_BYTES,
- b2, s2+LENGTH_BYTES, l2-LENGTH_BYTES);
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2
+ + LENGTH_BYTES, l2 - LENGTH_BYTES);
}
}
-
+
static {
WritableComparator.define(HiveKey.class, new Comparator());
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java Thu Jan 21 10:37:58 2010
@@ -30,9 +30,10 @@
/**
* <code>HiveOutputFormat</code> describes the output-specification for Hive's
- * operators. It has a method {@link #getHiveRecordWriter(JobConf, Path, Class,
- * boolean, Properties, Progressable)}, with various parameters used to create
- * the final out file and get some specific settings.
+ * operators. It has a method
+ * {@link #getHiveRecordWriter(JobConf, Path, Class, boolean, Properties, Progressable)}
+ * , with various parameters used to create the final out file and get some
+ * specific settings.
*
* @see org.apache.hadoop.mapred.OutputFormat
* @see RecordWriter
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java Thu Jan 21 10:37:58 2010
@@ -18,44 +18,46 @@
package org.apache.hadoop.hive.ql.io;
+import java.io.IOException;
+
import org.apache.hadoop.hive.ql.exec.ExecMapper;
-import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import java.io.IOException;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class HiveRecordReader<K extends WritableComparable, V extends Writable>
+ implements RecordReader<K, V> {
-public class HiveRecordReader<K extends WritableComparable, V extends Writable>
- implements RecordReader<K, V> {
+ private final RecordReader recordReader;
- private RecordReader recordReader;
- public HiveRecordReader(RecordReader recordReader){
+ public HiveRecordReader(RecordReader recordReader) {
this.recordReader = recordReader;
}
- public void close() throws IOException {
- recordReader.close();
+ public void close() throws IOException {
+ recordReader.close();
}
- public K createKey() {
- return (K)recordReader.createKey();
+ public K createKey() {
+ return (K) recordReader.createKey();
}
- public V createValue() {
- return (V)recordReader.createValue();
+ public V createValue() {
+ return (V) recordReader.createValue();
}
- public long getPos() throws IOException {
+ public long getPos() throws IOException {
return recordReader.getPos();
}
- public float getProgress() throws IOException {
+ public float getProgress() throws IOException {
return recordReader.getProgress();
}
-
- public boolean next(K key, V value) throws IOException {
- if (ExecMapper.getDone())
+
+ public boolean next(K key, V value) throws IOException {
+ if (ExecMapper.getDone()) {
return false;
+ }
return recordReader.next(key, value);
}
}
-
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java Thu Jan 21 10:37:58 2010
@@ -34,9 +34,8 @@
import org.apache.hadoop.util.Progressable;
/** A {@link HiveOutputFormat} that writes {@link SequenceFile}s. */
-public class HiveSequenceFileOutputFormat extends
- SequenceFileOutputFormat implements
- HiveOutputFormat<WritableComparable, Writable> {
+public class HiveSequenceFileOutputFormat extends SequenceFileOutputFormat
+ implements HiveOutputFormat<WritableComparable, Writable> {
BytesWritable EMPTY_KEY = new BytesWritable();
@@ -64,8 +63,7 @@
FileSystem fs = finalOutPath.getFileSystem(jc);
final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
- fs, finalOutPath, BytesWritable.class, valueClass,
- isCompressed);
+ fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);
return new RecordWriter() {
public void write(Writable r) throws IOException {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IgnoreKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IgnoreKeyTextOutputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IgnoreKeyTextOutputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/IgnoreKeyTextOutputFormat.java Thu Jan 21 10:37:58 2010
@@ -30,42 +30,39 @@
import org.apache.hadoop.util.Progressable;
/**
- * This class replaces key with null before feeding the <key, value>
- * to TextOutputFormat.RecordWriter.
+ * This class replaces key with null before feeding the <key, value> to
+ * TextOutputFormat.RecordWriter.
*
* @deprecated use {@link HiveIgnoreKeyTextOutputFormat} instead}
*/
-public class IgnoreKeyTextOutputFormat<K extends WritableComparable,
- V extends Writable>
- extends TextOutputFormat<K, V> {
-
- protected static class IgnoreKeyWriter<K extends WritableComparable,
- V extends Writable>
- implements RecordWriter<K, V> {
-
- private RecordWriter<K, V> mWriter;
-
+@Deprecated
+public class IgnoreKeyTextOutputFormat<K extends WritableComparable, V extends Writable>
+ extends TextOutputFormat<K, V> {
+
+ protected static class IgnoreKeyWriter<K extends WritableComparable, V extends Writable>
+ implements RecordWriter<K, V> {
+
+ private final RecordWriter<K, V> mWriter;
+
public IgnoreKeyWriter(RecordWriter<K, V> writer) {
this.mWriter = writer;
}
-
+
public synchronized void write(K key, V value) throws IOException {
- this.mWriter.write(null, value);
+ this.mWriter.write(null, value);
}
public void close(Reporter reporter) throws IOException {
this.mWriter.close(reporter);
}
}
-
- public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
- JobConf job,
- String name,
- Progressable progress)
- throws IOException {
-
- return new IgnoreKeyWriter<K, V>(super.getRecordWriter(ignored, job, name, progress));
+
+ @Override
+ public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
+ String name, Progressable progress) throws IOException {
+
+ return new IgnoreKeyWriter<K, V>(super.getRecordWriter(ignored, job, name,
+ progress));
}
-
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java Thu Jan 21 10:37:58 2010
@@ -30,10 +30,11 @@
*/
public interface InputFormatChecker {
- /**
- * This method is used to validate the input files
- *
- */
- public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws IOException;
+ /**
+ * This method is used to validate the input files
+ *
+ */
+ public boolean validateInput(FileSystem fs, HiveConf conf,
+ ArrayList<FileStatus> files) throws IOException;
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataInputBuffer.java Thu Jan 21 10:37:58 2010
@@ -31,9 +31,10 @@
* A thread-not-safe version of Hadoop's DataInputBuffer, which removes all
* synchronized modifiers.
*/
-public class NonSyncDataInputBuffer extends FilterInputStream implements DataInput {
+public class NonSyncDataInputBuffer extends FilterInputStream implements
+ DataInput {
- private NonSyncByteArrayInputStream buffer;
+ private final NonSyncByteArrayInputStream buffer;
byte[] buff = new byte[16];
@@ -77,7 +78,7 @@
*
* @throws IOException
* If a problem occurs reading from this DataInputStream.
- *
+ *
*/
@Override
public final int read(byte[] buffer) throws IOException {
@@ -156,8 +157,9 @@
while (offset < count) {
int bytesRead = in.read(buff, offset, count - offset);
- if (bytesRead == -1)
+ if (bytesRead == -1) {
return bytesRead;
+ }
offset += bytesRead;
}
return offset;
@@ -464,22 +466,26 @@
int utfSize) throws UTFDataFormatException {
int count = 0, s = 0, a;
while (count < utfSize) {
- if ((out[s] = (char) buf[offset + count++]) < '\u0080')
+ if ((out[s] = (char) buf[offset + count++]) < '\u0080') {
s++;
- else if (((a = out[s]) & 0xe0) == 0xc0) {
- if (count >= utfSize)
+ } else if (((a = out[s]) & 0xe0) == 0xc0) {
+ if (count >= utfSize) {
throw new UTFDataFormatException();
+ }
int b = buf[count++];
- if ((b & 0xC0) != 0x80)
+ if ((b & 0xC0) != 0x80) {
throw new UTFDataFormatException();
+ }
out[s++] = (char) (((a & 0x1F) << 6) | (b & 0x3F));
} else if ((a & 0xf0) == 0xe0) {
- if (count + 1 >= utfSize)
+ if (count + 1 >= utfSize) {
throw new UTFDataFormatException();
+ }
int b = buf[count++];
int c = buf[count++];
- if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80))
+ if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) {
throw new UTFDataFormatException();
+ }
out[s++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c & 0x3F));
} else {
throw new UTFDataFormatException();
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/NonSyncDataOutputBuffer.java Thu Jan 21 10:37:58 2010
@@ -29,7 +29,7 @@
*/
public class NonSyncDataOutputBuffer extends DataOutputStream {
- private NonSyncByteArrayOutputStream buffer;
+ private final NonSyncByteArrayOutputStream buffer;
/** Constructs a new empty buffer. */
public NonSyncDataOutputBuffer() {
@@ -56,7 +56,7 @@
/** Resets the buffer to empty. */
public NonSyncDataOutputBuffer reset() {
- this.written = 0;
+ written = 0;
buffer.reset();
return this;
}
@@ -66,11 +66,13 @@
buffer.write(in, length);
}
+ @Override
public void write(int b) throws IOException {
buffer.write(b);
incCount(1);
}
+ @Override
public void write(byte b[], int off, int len) throws IOException {
buffer.write(b, off, len);
incCount(len);
@@ -79,7 +81,8 @@
private void incCount(int value) {
if (written + value < 0) {
written = Integer.MAX_VALUE;
- } else
+ } else {
written += value;
+ }
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Thu Jan 21 10:37:58 2010
@@ -37,7 +37,6 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.CodecPool;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
@@ -221,7 +220,8 @@
* @param colValLenBuffer
* each cell's length of this column's in this split
*/
- void setColumnLenInfo(int columnValueLen, NonSyncDataOutputBuffer colValLenBuffer,
+ void setColumnLenInfo(int columnValueLen,
+ NonSyncDataOutputBuffer colValLenBuffer,
int columnUncompressedValueLen, int columnIndex) {
eachColumnValueLen[columnIndex] = columnValueLen;
eachColumnUncompressedValueLen[columnIndex] = columnUncompressedValueLen;
@@ -239,10 +239,11 @@
eachColumnValueLen[i] = WritableUtils.readVInt(in);
eachColumnUncompressedValueLen[i] = WritableUtils.readVInt(in);
int bufLen = WritableUtils.readVInt(in);
- if (allCellValLenBuffer[i] == null)
+ if (allCellValLenBuffer[i] == null) {
allCellValLenBuffer[i] = new NonSyncDataOutputBuffer();
- else
+ } else {
allCellValLenBuffer[i].reset();
+ }
allCellValLenBuffer[i].write(in, bufLen);
}
}
@@ -292,12 +293,12 @@
* </ul>
*/
static class ValueBuffer implements Writable {
-
+
class LazyDecompressionCallbackImpl implements LazyDecompressionCallback {
-
+
int index = -1;
int colIndex = -1;
-
+
public LazyDecompressionCallbackImpl(int index, int colIndex) {
super();
this.index = index;
@@ -306,27 +307,30 @@
@Override
public byte[] decompress() throws IOException {
-
- if (decompressedFlag[index] || codec == null)
+
+ if (decompressedFlag[index] || codec == null) {
return loadedColumnsValueBuffer[index].getData();
-
+ }
+
NonSyncDataOutputBuffer compressedData = loadedColumnsValueBuffer[index];
NonSyncDataOutputBuffer decompressedData = new NonSyncDataOutputBuffer();
decompressBuffer.reset();
DataInputStream valueIn = new DataInputStream(deflatFilter);
deflatFilter.resetState();
- decompressBuffer.reset(compressedData.getData(), keyBuffer.eachColumnValueLen[colIndex]);
- decompressedData.write(valueIn, keyBuffer.eachColumnUncompressedValueLen[colIndex]);
+ decompressBuffer.reset(compressedData.getData(),
+ keyBuffer.eachColumnValueLen[colIndex]);
+ decompressedData.write(valueIn,
+ keyBuffer.eachColumnUncompressedValueLen[colIndex]);
loadedColumnsValueBuffer[index] = decompressedData;
decompressedFlag[index] = true;
return decompressedData.getData();
}
}
-
+
// used to load columns' value into memory
private NonSyncDataOutputBuffer[] loadedColumnsValueBuffer = null;
private boolean[] decompressedFlag = null;
- private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
+ private LazyDecompressionCallbackImpl[] lazyDecompressCallbackObjs = null;
boolean inited = false;
@@ -362,19 +366,24 @@
skippedColIDs = skippedCols;
} else {
skippedColIDs = new boolean[columnNumber];
- for (int i = 0; i < skippedColIDs.length; i++)
+ for (int i = 0; i < skippedColIDs.length; i++) {
skippedColIDs[i] = false;
+ }
}
int skipped = 0;
if (skippedColIDs != null) {
- for (boolean currentSkip : skippedColIDs)
- if (currentSkip)
+ for (boolean currentSkip : skippedColIDs) {
+ if (currentSkip) {
skipped++;
+ }
+ }
}
- loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber - skipped];
+ loadedColumnsValueBuffer = new NonSyncDataOutputBuffer[columnNumber
+ - skipped];
decompressedFlag = new boolean[columnNumber - skipped];
- lazyDecompressCallbackObjs = new LazyDecompressionCallbackImpl[columnNumber - skipped];
+ lazyDecompressCallbackObjs = new LazyDecompressionCallbackImpl[columnNumber
+ - skipped];
this.codec = codec;
if (codec != null) {
valDecompressor = CodecPool.getDecompressor(codec);
@@ -383,12 +392,14 @@
}
for (int k = 0, readIndex = 0; k < columnNumber; k++) {
- if (skippedColIDs[k])
+ if (skippedColIDs[k]) {
continue;
+ }
loadedColumnsValueBuffer[readIndex] = new NonSyncDataOutputBuffer();
- if(codec != null) {
+ if (codec != null) {
decompressedFlag[readIndex] = false;
- lazyDecompressCallbackObjs[readIndex] = new LazyDecompressionCallbackImpl(readIndex, k);
+ lazyDecompressCallbackObjs[readIndex] = new LazyDecompressionCallbackImpl(
+ readIndex, k);
} else {
decompressedFlag[readIndex] = true;
}
@@ -396,7 +407,8 @@
}
}
- public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer, int addIndex) {
+ public void setColumnValueBuffer(NonSyncDataOutputBuffer valBuffer,
+ int addIndex) {
loadedColumnsValueBuffer[addIndex] = valBuffer;
}
@@ -420,8 +432,9 @@
NonSyncDataOutputBuffer valBuf = loadedColumnsValueBuffer[addIndex];
valBuf.reset();
valBuf.write(in, vaRowsLen);
- if(codec != null)
+ if (codec != null) {
decompressedFlag[addIndex] = false;
+ }
addIndex++;
}
@@ -432,8 +445,7 @@
@Override
public void write(DataOutput out) throws IOException {
- for (int i = 0; i < loadedColumnsValueBuffer.length; i++) {
- NonSyncDataOutputBuffer currentBuf = loadedColumnsValueBuffer[i];
+ for (NonSyncDataOutputBuffer currentBuf : loadedColumnsValueBuffer) {
out.write(currentBuf.getData(), 0, currentBuf.getLength());
}
}
@@ -443,8 +455,8 @@
}
public void close() {
- for (int i = 0; i < loadedColumnsValueBuffer.length; i++) {
- IOUtils.closeStream(loadedColumnsValueBuffer[i]);
+ for (NonSyncDataOutputBuffer element : loadedColumnsValueBuffer) {
+ IOUtils.closeStream(element);
}
if (codec != null) {
IOUtils.closeStream(decompressBuffer);
@@ -496,7 +508,7 @@
NonSyncDataOutputBuffer[] compressionBuffer;
CompressionOutputStream[] deflateFilter = null;
DataOutputStream[] deflateOut = null;
- private ColumnBuffer[] columnBuffers;
+ private final ColumnBuffer[] columnBuffers;
NonSyncDataOutputBuffer keyCompressionBuffer;
CompressionOutputStream keyDeflateFilter;
@@ -505,7 +517,7 @@
private int columnNumber = 0;
- private int[] columnValuePlainLength;
+ private final int[] columnValuePlainLength;
KeyBuffer key = null;
ValueBuffer value = null;
@@ -528,7 +540,7 @@
*/
int runLength = 0;
int prevValueLength = -1;
-
+
ColumnBuffer() throws IOException {
columnValBuffer = new NonSyncDataOutputBuffer();
valLenBuffer = new NonSyncDataOutputBuffer();
@@ -537,17 +549,17 @@
public void append(BytesRefWritable data) throws IOException {
data.writeDataTo(columnValBuffer);
int currentLen = data.getLength();
-
- if( prevValueLength < 0) {
+
+ if (prevValueLength < 0) {
startNewGroup(currentLen);
return;
}
-
- if(currentLen != prevValueLength) {
+
+ if (currentLen != prevValueLength) {
flushGroup();
startNewGroup(currentLen);
} else {
- runLength ++;
+ runLength++;
}
}
@@ -563,12 +575,13 @@
prevValueLength = -1;
runLength = 0;
}
-
+
public void flushGroup() throws IOException {
if (prevValueLength >= 0) {
WritableUtils.writeVLong(valLenBuffer, prevValueLength);
- if (runLength > 0)
+ if (runLength > 0) {
WritableUtils.writeVLong(valLenBuffer, ~runLength);
+ }
runLength = -1;
prevValueLength = -1;
}
@@ -645,8 +658,9 @@
RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL);
columnNumber = conf.getInt(COLUMN_NUMBER_CONF_STR, 0);
- if (metadata == null)
+ if (metadata == null) {
metadata = new Metadata();
+ }
metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text(""
+ columnNumber));
@@ -766,9 +780,10 @@
*/
public void append(Writable val) throws IOException {
- if (!(val instanceof BytesRefArrayWritable))
+ if (!(val instanceof BytesRefArrayWritable)) {
throw new UnsupportedOperationException(
"Currently the writer can only accept BytesRefArrayWritable");
+ }
BytesRefArrayWritable columns = (BytesRefArrayWritable) val;
int size = columns.size();
@@ -802,7 +817,7 @@
for (int columnIndex = 0; columnIndex < columnNumber; columnIndex++) {
ColumnBuffer currentBuf = columnBuffers[columnIndex];
currentBuf.flushGroup();
-
+
NonSyncDataOutputBuffer columnValue = currentBuf.columnValBuffer;
if (isCompressed()) {
@@ -829,8 +844,9 @@
}
int keyLength = key.getSize();
- if (keyLength < 0)
+ if (keyLength < 0) {
throw new IOException("negative length keys not allowed: " + key);
+ }
// Write the record out
checkAndWriteSync(); // sync
@@ -865,8 +881,9 @@
}
public synchronized void close() throws IOException {
- if (bufferedRecords > 0)
+ if (bufferedRecords > 0) {
flushRecords();
+ }
clearColumnBuffers();
if (isCompressed()) {
@@ -898,25 +915,25 @@
*/
public static class Reader {
- private Path file;
- private FSDataInputStream in;
+ private final Path file;
+ private final FSDataInputStream in;
private byte version;
private CompressionCodec codec = null;
private Metadata metadata = null;
- private byte[] sync = new byte[SYNC_HASH_SIZE];
- private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
+ private final byte[] sync = new byte[SYNC_HASH_SIZE];
+ private final byte[] syncCheck = new byte[SYNC_HASH_SIZE];
private boolean syncSeen;
- private long end;
+ private final long end;
private int currentKeyLength;
private int currentRecordLength;
- private Configuration conf;
+ private final Configuration conf;
- private ValueBuffer currentValue;
+ private final ValueBuffer currentValue;
private boolean[] skippedColIDs = null;
@@ -931,14 +948,14 @@
private int passedRowsNum = 0;
private int[] columnRowReadIndex = null;
- private NonSyncDataInputBuffer[] colValLenBufferReadIn;
- private int[] columnRunLength;
- private int[] columnPrvLength;
+ private final NonSyncDataInputBuffer[] colValLenBufferReadIn;
+ private final int[] columnRunLength;
+ private final int[] columnPrvLength;
private boolean decompress = false;
private Decompressor keyDecompressor;
NonSyncDataOutputBuffer keyDecompressedData = new NonSyncDataOutputBuffer();
-
+
int[] prjColIDs = null; // selected column IDs
/** Create a new RCFile reader. */
@@ -966,15 +983,17 @@
columnNumber = Integer.parseInt(metadata.get(
new Text(COLUMN_NUMBER_METADATA_STR)).toString());
- java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf);
+ java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils
+ .getReadColumnIDs(conf);
skippedColIDs = new boolean[columnNumber];
if (notSkipIDs.size() > 0) {
for (int i = 0; i < skippedColIDs.length; i++) {
skippedColIDs[i] = true;
}
for (int read : notSkipIDs) {
- if (read < columnNumber)
+ if (read < columnNumber) {
skippedColIDs[read] = false;
+ }
}
} else {
// TODO: if no column name is specified e.g, in select count(1) from tt;
@@ -987,15 +1006,16 @@
loadColumnNum = columnNumber;
if (skippedColIDs != null && skippedColIDs.length > 0) {
- for (int i = 0; i < skippedColIDs.length; i++) {
- if (skippedColIDs[i])
+ for (boolean skippedColID : skippedColIDs) {
+ if (skippedColID) {
loadColumnNum -= 1;
+ }
}
}
// get list of selected column IDs
prjColIDs = new int[loadColumnNum];
- for ( int i = 0, j = 0; i < columnNumber; ++i ) {
+ for (int i = 0, j = 0; i < columnNumber; ++i) {
if (!skippedColIDs[i]) {
prjColIDs[j++] = i;
}
@@ -1007,8 +1027,9 @@
columnRowReadIndex = new int[columnNumber];
for (int i = 0; i < columnNumber; i++) {
columnRowReadIndex[i] = 0;
- if (!skippedColIDs[i])
+ if (!skippedColIDs[i]) {
colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
+ }
columnRunLength[i] = 0;
columnPrvLength[i] = -1;
}
@@ -1022,20 +1043,23 @@
in.readFully(versionBlock);
if ((versionBlock[0] != VERSION[0]) || (versionBlock[1] != VERSION[1])
- || (versionBlock[2] != VERSION[2]))
+ || (versionBlock[2] != VERSION[2])) {
throw new IOException(file + " not a RCFile");
+ }
// Set 'version'
version = versionBlock[3];
- if (version > VERSION[3])
+ if (version > VERSION[3]) {
throw new VersionMismatchException(VERSION[3], version);
+ }
try {
Class<?> keyCls = conf.getClassByName(Text.readString(in));
Class<?> valCls = conf.getClassByName(Text.readString(in));
if (!keyCls.equals(KeyBuffer.class)
- || !valCls.equals(ValueBuffer.class))
+ || !valCls.equals(ValueBuffer.class)) {
throw new IOException(file + " not a RCFile");
+ }
} catch (ClassNotFoundException e) {
throw new IOException(file + " not a RCFile", e);
}
@@ -1110,8 +1134,9 @@
for (int i = 0; in.getPos() < end; i++) {
int j = 0;
for (; j < syncLen; j++) {
- if (sync[j] != syncCheck[(i + j) % syncLen])
+ if (sync[j] != syncCheck[(i + j) % syncLen]) {
break;
+ }
}
if (j == syncLen) {
in.seek(in.getPos() - SYNC_SIZE); // position before
@@ -1160,8 +1185,9 @@
// a
// sync entry
in.readFully(syncCheck); // read syncCheck
- if (!Arrays.equals(sync, syncCheck)) // check it
+ if (!Arrays.equals(sync, syncCheck)) {
throw new IOException("File is corrupt!");
+ }
syncSeen = true;
if (in.getPos() >= end) {
return -1;
@@ -1174,8 +1200,9 @@
}
private void seekToNextKeyBuffer() throws IOException {
- if (!keyInit)
+ if (!keyInit) {
return;
+ }
if (!currentValue.inited) {
in.skip(currentRecordLength - currentKeyLength);
}
@@ -1220,21 +1247,22 @@
readRowsIndexInBuffer = 0;
recordsNumInValBuffer = currentKey.numberRows;
- for (int j = 0; j < prjColIDs.length; j++) {
- int i = prjColIDs[j];
+ for (int prjColID : prjColIDs) {
+ int i = prjColID;
colValLenBufferReadIn[i].reset(currentKey.allCellValLenBuffer[i]
.getData(), currentKey.allCellValLenBuffer[i].getLength());
columnRowReadIndex[i] = 0;
columnRunLength[i] = 0;
columnPrvLength[i] = -1;
}
-
+
return currentKeyLength;
}
protected void currentValueBuffer() throws IOException {
- if (!keyInit)
+ if (!keyInit) {
nextKeyBuffer();
+ }
currentValue.keyBuffer = currentKey;
currentValue.clearColumnBuffer();
currentValue.readFields(in);
@@ -1245,7 +1273,7 @@
// use this buffer to hold column's cells value length for usages in
// getColumn(), instead of using colValLenBufferReadIn directly.
- private NonSyncDataInputBuffer fetchColumnTempBuf = new NonSyncDataInputBuffer();
+ private final NonSyncDataInputBuffer fetchColumnTempBuf = new NonSyncDataInputBuffer();
/**
* Fetch all data in the buffer for a given column. This is useful for
@@ -1271,8 +1299,9 @@
rest.resetValid(recordsNumInValBuffer);
- if (!currentValue.inited)
+ if (!currentValue.inited) {
currentValueBuffer();
+ }
int columnNextRowStart = 0;
fetchColumnTempBuf.reset(currentKey.allCellValLenBuffer[columnID]
@@ -1281,10 +1310,13 @@
int length = getColumnNextValueLength(columnID);
BytesRefWritable currentCell = rest.get(i);
- if (currentValue.decompressedFlag[columnID])
- currentCell.set(currentValue.loadedColumnsValueBuffer[columnID].getData(), columnNextRowStart, length);
- else
- currentCell.set(currentValue.lazyDecompressCallbackObjs[columnID], columnNextRowStart, length);
+ if (currentValue.decompressedFlag[columnID]) {
+ currentCell.set(currentValue.loadedColumnsValueBuffer[columnID]
+ .getData(), columnNextRowStart, length);
+ } else {
+ currentCell.set(currentValue.lazyDecompressCallbackObjs[columnID],
+ columnNextRowStart, length);
+ }
columnNextRowStart = columnNextRowStart + length;
}
return rest;
@@ -1292,8 +1324,9 @@
/**
* Read in next key buffer and throw any data in current key buffer and
- * current value buffer. It will influence the result of {@link
- * #next(LongWritable)} and {@link #getCurrentRow(BytesRefArrayWritable)}
+ * current value buffer. It will influence the result of
+ * {@link #next(LongWritable)} and
+ * {@link #getCurrentRow(BytesRefArrayWritable)}
*
* @return whether there still has records or not
* @throws IOException
@@ -1349,14 +1382,15 @@
public synchronized void getCurrentRow(BytesRefArrayWritable ret)
throws IOException {
- if (!keyInit || rowFetched)
+ if (!keyInit || rowFetched) {
return;
+ }
if (!currentValue.inited) {
currentValueBuffer();
- // do this only when not initialized, but we may need to find a way to
- // tell the caller how to initialize the valid size
- ret.resetValid(columnNumber);
+ // do this only when not initialized, but we may need to find a way to
+ // tell the caller how to initialize the valid size
+ ret.resetValid(columnNumber);
}
// we do not use BytesWritable here to avoid the byte-copy from
@@ -1367,14 +1401,17 @@
BytesRefWritable ref = ret.unCheckedGet(i);
- int columnCurrentRowStart = (int) columnRowReadIndex[i];
+ int columnCurrentRowStart = columnRowReadIndex[i];
int length = getColumnNextValueLength(i);
columnRowReadIndex[i] = columnCurrentRowStart + length;
- if (currentValue.decompressedFlag[j])
- ref.set(currentValue.loadedColumnsValueBuffer[j].getData(), columnCurrentRowStart, length);
- else
- ref.set(currentValue.lazyDecompressCallbackObjs[j], columnCurrentRowStart, length);
+ if (currentValue.decompressedFlag[j]) {
+ ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+ columnCurrentRowStart, length);
+ } else {
+ ref.set(currentValue.lazyDecompressCallbackObjs[j],
+ columnCurrentRowStart, length);
+ }
}
rowFetched = true;
}
@@ -1405,6 +1442,7 @@
}
/** Returns the name of the file. */
+ @Override
public String toString() {
return file.toString();
}
@@ -1413,7 +1451,7 @@
public void close() {
IOUtils.closeStream(in);
currentValue.close();
- if (this.decompress) {
+ if (decompress) {
IOUtils.closeStream(keyDecompressedData);
CodecPool.returnDecompressor(keyDecompressor);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java Thu Jan 21 10:37:58 2010
@@ -35,12 +35,13 @@
import org.apache.hadoop.mapred.Reporter;
public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWritable>
- extends FileInputFormat<K, V> implements InputFormatChecker{
+ extends FileInputFormat<K, V> implements InputFormatChecker {
public RCFileInputFormat() {
setMinSplitSize(SequenceFile.SYNC_INTERVAL);
}
+ @Override
@SuppressWarnings("unchecked")
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
@@ -53,11 +54,13 @@
@Override
public boolean validateInput(FileSystem fs, HiveConf conf,
ArrayList<FileStatus> files) throws IOException {
- if (files.size() <= 0)
+ if (files.size() <= 0) {
return false;
+ }
for (int fileId = 0; fileId < files.size(); fileId++) {
try {
- RCFile.Reader reader = new RCFile.Reader(fs, files.get(fileId).getPath(), conf);
+ RCFile.Reader reader = new RCFile.Reader(fs, files.get(fileId)
+ .getPath(), conf);
reader.close();
} catch (IOException e) {
return false;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java Thu Jan 21 10:37:58 2010
@@ -67,6 +67,7 @@
}
/** {@inheritDoc} */
+ @Override
public RecordWriter<WritableComparable, BytesRefArrayWritable> getRecordWriter(
FileSystem ignored, JobConf job, String name, Progressable progress)
throws IOException {
@@ -124,10 +125,11 @@
String[] cols = null;
String columns = tableProperties.getProperty("columns");
- if (columns == null || columns.trim().equals(""))
+ if (columns == null || columns.trim().equals("")) {
cols = new String[0];
- else
+ } else {
cols = StringUtils.split(columns, ",");
+ }
RCFileOutputFormat.setColumnNumber(jc, cols.length);
final RCFile.Writer outWriter = Utilities.createRCFileWriter(jc, FileSystem
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileRecordReader.java Thu Jan 21 10:37:58 2010
@@ -33,9 +33,9 @@
public class RCFileRecordReader<K extends LongWritable, V extends BytesRefArrayWritable>
implements RecordReader<LongWritable, BytesRefArrayWritable> {
- private Reader in;
- private long start;
- private long end;
+ private final Reader in;
+ private final long start;
+ private final long end;
private boolean more = true;
protected Configuration conf;
@@ -47,8 +47,9 @@
this.end = split.getStart() + split.getLength();
this.conf = conf;
- if (split.getStart() > in.getPosition())
+ if (split.getStart() > in.getPosition()) {
in.sync(split.getStart()); // sync to start
+ }
this.start = in.getPosition();
more = start < end;
@@ -76,8 +77,9 @@
@Override
public boolean next(LongWritable key, BytesRefArrayWritable value)
throws IOException {
- if (!more)
+ if (!more) {
return false;
+ }
long pos = in.getPosition();
boolean hasMore = in.next(key);
if (hasMore) {
@@ -96,8 +98,9 @@
}
protected boolean next(LongWritable key) throws IOException {
- if (!more)
+ if (!more) {
return false;
+ }
long pos = in.getPosition();
boolean hasMore = in.next(key);
if (pos >= end && in.syncSeen()) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java Thu Jan 21 10:37:58 2010
@@ -31,8 +31,9 @@
@Override
public boolean validateInput(FileSystem fs, HiveConf conf,
ArrayList<FileStatus> files) throws IOException {
- if (files.size() <= 0)
+ if (files.size() <= 0) {
return false;
+ }
for (int fileId = 0; fileId < files.size(); fileId++) {
try {
SequenceFile.Reader reader = new SequenceFile.Reader(fs, files.get(
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java Thu Jan 21 10:37:58 2010
@@ -29,26 +29,29 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
- * base class for operator graph walker
- * this class takes list of starting ops and walks them one by one. it maintains list of walked
- * operators (dispatchedList) and a list of operators that are discovered but not yet dispatched
+ * base class for operator graph walker this class takes list of starting ops
+ * and walks them one by one. it maintains list of walked operators
+ * (dispatchedList) and a list of operators that are discovered but not yet
+ * dispatched
*/
public class DefaultGraphWalker implements GraphWalker {
protected Stack<Node> opStack;
- private List<Node> toWalk = new ArrayList<Node>();
- private Set<Node> seenList = new HashSet<Node>();
- private HashMap<Node, Object> retMap = new HashMap<Node, Object>();
- private Dispatcher dispatcher;
+ private final List<Node> toWalk = new ArrayList<Node>();
+ private final Set<Node> seenList = new HashSet<Node>();
+ private final HashMap<Node, Object> retMap = new HashMap<Node, Object>();
+ private final Dispatcher dispatcher;
/**
* Constructor
- * @param disp dispatcher to call for each op encountered
+ *
+ * @param disp
+ * dispatcher to call for each op encountered
*/
public DefaultGraphWalker(Dispatcher disp) {
- this.dispatcher = disp;
+ dispatcher = disp;
opStack = new Stack<Node>();
- }
+ }
/**
* @return the toWalk
@@ -66,8 +69,11 @@
/**
* Dispatch the current operator
- * @param nd node being walked
- * @param ndStack stack of nodes encountered
+ *
+ * @param nd
+ * node being walked
+ * @param ndStack
+ * stack of nodes encountered
* @throws SemanticException
*/
public void dispatch(Node nd, Stack<Node> ndStack) throws SemanticException {
@@ -75,22 +81,24 @@
if (nd.getChildren() != null) {
nodeOutputs = new Object[nd.getChildren().size()];
int i = 0;
- for(Node child: nd.getChildren()) {
+ for (Node child : nd.getChildren()) {
nodeOutputs[i++] = retMap.get(child);
}
}
-
+
Object retVal = dispatcher.dispatch(nd, ndStack, nodeOutputs);
retMap.put(nd, retVal);
}
/**
* starting point for walking
+ *
* @throws SemanticException
*/
- public void startWalking(Collection<Node> startNodes, HashMap<Node, Object> nodeOutput) throws SemanticException {
+ public void startWalking(Collection<Node> startNodes,
+ HashMap<Node, Object> nodeOutput) throws SemanticException {
toWalk.addAll(startNodes);
- while(toWalk.size() > 0) {
+ while (toWalk.size() > 0) {
Node nd = toWalk.remove(0);
walk(nd);
if (nodeOutput != null) {
@@ -101,19 +109,23 @@
/**
* walk the current operator and its descendants
- * @param nd current operator in the graph
+ *
+ * @param nd
+ * current operator in the graph
* @throws SemanticException
*/
public void walk(Node nd) throws SemanticException {
- if (opStack.empty() || nd != opStack.peek())
+ if (opStack.empty() || nd != opStack.peek()) {
opStack.push(nd);
+ }
- if((nd.getChildren() == null)
+ if ((nd.getChildren() == null)
|| getDispatchedList().containsAll(nd.getChildren())) {
// all children are done or no need to walk the children
- if(getDispatchedList().contains(nd))
+ if (getDispatchedList().contains(nd)) {
// sanity check
assert false;
+ }
dispatch(nd, opStack);
opStack.pop();
return;
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultRuleDispatcher.java Thu Jan 21 10:37:58 2010
@@ -24,36 +24,44 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
- * Dispatches calls to relevant method in processor. The user registers various rules with the dispatcher, and
- * the processor corresponding to closest matching rule is fired.
+ * Dispatches calls to relevant method in processor. The user registers various
+ * rules with the dispatcher, and the processor corresponding to closest
+ * matching rule is fired.
*/
public class DefaultRuleDispatcher implements Dispatcher {
-
- private Map<Rule, NodeProcessor> procRules;
- private NodeProcessorCtx procCtx;
- private NodeProcessor defaultProc;
+
+ private final Map<Rule, NodeProcessor> procRules;
+ private final NodeProcessorCtx procCtx;
+ private final NodeProcessor defaultProc;
/**
* constructor
- * @param defaultProc default processor to be fired if no rule matches
- * @param rules operator processor that handles actual processing of the node
- * @param procCtx operator processor context, which is opaque to the dispatcher
+ *
+ * @param defaultProc
+ * default processor to be fired if no rule matches
+ * @param rules
+ * operator processor that handles actual processing of the node
+ * @param procCtx
+ * operator processor context, which is opaque to the dispatcher
*/
- public DefaultRuleDispatcher(NodeProcessor defaultProc,
- Map<Rule, NodeProcessor> rules, NodeProcessorCtx procCtx) {
+ public DefaultRuleDispatcher(NodeProcessor defaultProc,
+ Map<Rule, NodeProcessor> rules, NodeProcessorCtx procCtx) {
this.defaultProc = defaultProc;
- this.procRules = rules;
- this.procCtx = procCtx;
+ procRules = rules;
+ this.procCtx = procCtx;
}
/**
* dispatcher function
- * @param nd operator to process
- * @param ndStack the operators encountered so far
+ *
+ * @param nd
+ * operator to process
+ * @param ndStack
+ * the operators encountered so far
* @throws SemanticException
*/
- public Object dispatch(Node nd, Stack<Node> ndStack, Object... nodeOutputs)
- throws SemanticException {
+ public Object dispatch(Node nd, Stack<Node> ndStack, Object... nodeOutputs)
+ throws SemanticException {
// find the firing rule
// find the rule from the stack specified
@@ -69,17 +77,18 @@
NodeProcessor proc;
- if (rule == null)
+ if (rule == null) {
proc = defaultProc;
- else
+ } else {
proc = procRules.get(rule);
+ }
// Do nothing in case proc is null
if (proc != null) {
// Call the process function
return proc.process(nd, ndStack, procCtx, nodeOutputs);
- }
- else
+ } else {
return null;
+ }
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java?rev=901644&r1=901643&r2=901644&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lib/Dispatcher.java Thu Jan 21 10:37:58 2010
@@ -23,20 +23,24 @@
import org.apache.hadoop.hive.ql.parse.SemanticException;
/**
- * Dispatcher interface for Operators
- * Used in operator graph walking to dispatch process/visitor functions for operators
+ * Dispatcher interface for Operators Used in operator graph walking to dispatch
+ * process/visitor functions for operators
*/
public interface Dispatcher {
/**
* Dispatcher function.
- * @param nd operator to process.
- * @param stack operator stack to process.
- * @param nodeOutputs The argument list of outputs from processing other nodes that are
- * passed to this dispatcher from the walker.
+ *
+ * @param nd
+ * operator to process.
+ * @param stack
+ * operator stack to process.
+ * @param nodeOutputs
+ * The argument list of outputs from processing other nodes that are
+ * passed to this dispatcher from the walker.
* @return Object The return object from the processing call.
* @throws SemanticException
*/
- public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
- throws SemanticException;
+ public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
+ throws SemanticException;
}