You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2014/03/26 19:14:39 UTC
svn commit: r1581977 [2/5] - in /hive/trunk: ./
common/src/java/org/apache/hadoop/hive/common/
itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/
metastore/src/java/org/apache/hadoop/hive/metastore/
metastore/src/java/org/apache/hadoop/hi...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Wed Mar 26 18:14:37 2014
@@ -20,8 +20,8 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -36,25 +36,28 @@ import org.apache.hadoop.fs.BlockLocatio
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnListImpl;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
-import org.apache.hadoop.hive.ql.io.orc.Metadata;
-import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo;
-import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.LongWritable;
@@ -72,17 +75,37 @@ import com.google.common.cache.CacheBuil
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A MapReduce/Hive input format for ORC files.
+ * <p>
+ * This class implements both the classic InputFormat, which stores the rows
+ * directly, and AcidInputFormat, which stores a series of events with the
+ * following schema:
+ * <pre>
+ * class AcidEvent<ROW> {
+ * enum ACTION {INSERT, UPDATE, DELETE}
+ * ACTION operation;
+ * long originalTransaction;
+ * int bucket;
+ * long rowId;
+ * long currentTransaction;
+ * ROW row;
+ * }
+ * </pre>
+ * Each AcidEvent object corresponds to an update event. The
+ * originalTransaction, bucket, and rowId are the unique identifier for the row.
+ * The operation and currentTransaction are the operation and the transaction
+ * that added this event. Insert and update events include the entire row, while
+ * delete events have null for row.
*/
public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
- InputFormatChecker, VectorizedInputFormatInterface {
-
- VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat();
+ InputFormatChecker, VectorizedInputFormatInterface,
+ AcidInputFormat<OrcStruct> {
private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
+ static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
static final String MIN_SPLIT_SIZE =
- ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE");
+ SHIMS.getHadoopConfNames().get("MAPREDMINSPLITSIZE");
static final String MAX_SPLIT_SIZE =
- ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE");
+ SHIMS.getHadoopConfNames().get("MAPREDMAXSPLITSIZE");
private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024;
private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024;
@@ -113,13 +136,13 @@ public class OrcInputFormat implements
OrcRecordReader(Reader file, Configuration conf,
- long offset, long length) throws IOException {
+ FileSplit split) throws IOException {
List<OrcProto.Type> types = file.getTypes();
this.file = file;
numColumns = (types.size() == 0) ? 0 : types.get(0).getSubtypesCount();
+ this.offset = split.getStart();
+ this.length = split.getLength();
this.reader = createReaderFromFile(file, conf, offset, length);
- this.offset = offset;
- this.length = length;
this.stats = new SerDeStats();
}
@@ -166,139 +189,107 @@ public class OrcInputFormat implements
return stats;
}
}
-
- static RecordReader createReaderFromFile(
- Reader file, Configuration conf, long offset, long length)
- throws IOException {
- List<OrcProto.Type> types = file.getTypes();
- boolean[] includedColumns = findIncludedColumns(types, conf);
- String[] columnNames = getIncludedColumnNames(types, includedColumns,
- conf);
- SearchArgument sarg = createSarg(types, conf);
- RecordReader reader =
- file.rows(offset, length, includedColumns, sarg, columnNames);
- return reader;
+
+ /**
+ * Get the root column for the row. In ACID format files, it is offset by
+ * the extra metadata columns.
+ * @param isOriginal is the file in the original format?
+ * @return the column number for the root of row.
+ */
+ private static int getRootColumn(boolean isOriginal) {
+ return isOriginal ? 0 : (OrcRecordUpdater.ROW + 1);
}
- private static final PathFilter hiddenFileFilter = new PathFilter(){
- public boolean accept(Path p){
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
+ public static RecordReader createReaderFromFile(Reader file,
+ Configuration conf,
+ long offset, long length
+ ) throws IOException {
+ Reader.Options options = new Reader.Options().range(offset, length);
+ boolean isOriginal =
+ !file.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME);
+ List<OrcProto.Type> types = file.getTypes();
+ setIncludedColumns(options, types, conf, isOriginal);
+ setSearchArgument(options, types, conf, isOriginal);
+ return file.rowsOptions(options);
+ }
/**
* Recurse down into a type subtree turning on all of the sub-columns.
* @param types the types of the file
* @param result the global view of columns that should be included
* @param typeId the root of tree to enable
+ * @param rootColumn the top column
*/
- static void includeColumnRecursive(List<OrcProto.Type> types,
+ private static void includeColumnRecursive(List<OrcProto.Type> types,
boolean[] result,
- int typeId) {
- result[typeId] = true;
+ int typeId,
+ int rootColumn) {
+ result[typeId - rootColumn] = true;
OrcProto.Type type = types.get(typeId);
int children = type.getSubtypesCount();
for(int i=0; i < children; ++i) {
- includeColumnRecursive(types, result, type.getSubtypes(i));
- }
- }
-
- public static SearchArgument createSarg(List<OrcProto.Type> types, Configuration conf) {
- String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
- if (serializedPushdown == null
- || conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) == null) {
- LOG.debug("No ORC pushdown predicate");
- return null;
- }
- SearchArgument sarg = SearchArgument.FACTORY.create
- (Utilities.deserializeExpression(serializedPushdown));
- LOG.info("ORC pushdown predicate: " + sarg);
- return sarg;
- }
-
- public static String[] getIncludedColumnNames(
- List<OrcProto.Type> types, boolean[] includedColumns, Configuration conf) {
- String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
- if (LOG.isDebugEnabled()) {
- LOG.debug("included columns names = " + columnNamesString);
- }
- if (columnNamesString == null || conf.get(TableScanDesc.FILTER_EXPR_CONF_STR) == null) {
- return null;
- }
- String[] neededColumnNames = columnNamesString.split(",");
- int i = 0;
- String[] columnNames = new String[types.size()];
- for(int columnId: types.get(0).getSubtypesList()) {
- if (includedColumns == null || includedColumns[columnId]) {
- columnNames[columnId] = neededColumnNames[i++];
- }
+ includeColumnRecursive(types, result, type.getSubtypes(i), rootColumn);
}
- return columnNames;
}
/**
* Take the configuration and figure out which columns we need to include.
- * @param types the types of the file
+ * @param options the options to update
+ * @param types the types for the file
* @param conf the configuration
- * @return true for each column that should be included
+ * @param isOriginal is the file in the original format?
*/
- public static boolean[] findIncludedColumns(List<OrcProto.Type> types, Configuration conf) {
- LOG.info("included column ids = " + conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
- if (ColumnProjectionUtils.isReadAllColumns(conf)) {
- return null;
- } else {
- int numColumns = types.size();
+ static void setIncludedColumns(Reader.Options options,
+ List<OrcProto.Type> types,
+ Configuration conf,
+ boolean isOriginal) {
+ int rootColumn = getRootColumn(isOriginal);
+ if (!ColumnProjectionUtils.isReadAllColumns(conf)) {
+ int numColumns = types.size() - rootColumn;
boolean[] result = new boolean[numColumns];
result[0] = true;
- OrcProto.Type root = types.get(0);
+ OrcProto.Type root = types.get(rootColumn);
List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
for(int i=0; i < root.getSubtypesCount(); ++i) {
if (included.contains(i)) {
- includeColumnRecursive(types, result, root.getSubtypes(i));
+ includeColumnRecursive(types, result, root.getSubtypes(i),
+ rootColumn);
}
}
- // if we are filtering at least one column, return the boolean array
- for(boolean include: result) {
- if (!include) {
- return result;
- }
- }
- return null;
+ options.include(result);
+ } else {
+ options.include(null);
}
}
- @SuppressWarnings("unchecked")
- @Override
- public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
- getRecordReader(InputSplit inputSplit, JobConf conf,
- Reporter reporter) throws IOException {
- if (isVectorMode(conf)) {
- org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
- reporter);
- return (org.apache.hadoop.mapred.RecordReader) vorr;
- }
- FileSplit fSplit = (FileSplit)inputSplit;
- reporter.setStatus(fSplit.toString());
- Path path = fSplit.getPath();
- FileSystem fs = path.getFileSystem(conf);
- Reader reader = null;
-
- if(!(fSplit instanceof OrcSplit)){
- //If CombineHiveInputFormat is used, it works with FileSplit and not OrcSplit
- reader = OrcFile.createReader(fs, path, conf);
+ static void setSearchArgument(Reader.Options options,
+ List<OrcProto.Type> types,
+ Configuration conf,
+ boolean isOriginal) {
+ int rootColumn = getRootColumn(isOriginal);
+ String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+ String columnNamesString =
+ conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+ if (serializedPushdown == null || columnNamesString == null) {
+ LOG.debug("No ORC pushdown predicate");
+ options.searchArgument(null, null);
} else {
- //We have OrcSplit, which may have footer metadata cached, so use the appropriate reader
- //constructor
- OrcSplit orcSplit = (OrcSplit) fSplit;
- if (orcSplit.hasFooter()) {
- FileMetaInfo fMetaInfo = orcSplit.getFileMetaInfo();
- reader = OrcFile.createReader(fs, path, fMetaInfo, conf);
- } else {
- reader = OrcFile.createReader(fs, path, conf);
+ SearchArgument sarg = SearchArgument.FACTORY.create
+ (Utilities.deserializeExpression(serializedPushdown));
+ LOG.info("ORC pushdown predicate: " + sarg);
+ String[] neededColumnNames = columnNamesString.split(",");
+ String[] columnNames = new String[types.size() - rootColumn];
+ boolean[] includedColumns = options.getInclude();
+ int i = 0;
+ for(int columnId: types.get(rootColumn).getSubtypesList()) {
+ if (includedColumns == null || includedColumns[columnId]) {
+ // this is guaranteed to be positive because types only have children
+ // ids greater than their own id.
+ columnNames[columnId - rootColumn] = neededColumnNames[i++];
+ }
}
+ options.searchArgument(sarg, columnNames);
}
- return new OrcRecordReader(reader, conf, fSplit.getStart(), fSplit.getLength());
}
@Override
@@ -306,8 +297,8 @@ public class OrcInputFormat implements
ArrayList<FileStatus> files
) throws IOException {
- if (isVectorMode(conf)) {
- return voif.validateInput(fs, conf, files);
+ if (Utilities.isVectorMode(conf)) {
+ return new VectorizedOrcInputFormat().validateInput(fs, conf, files);
}
if (files.size() <= 0) {
@@ -315,7 +306,8 @@ public class OrcInputFormat implements
}
for (FileStatus file : files) {
try {
- OrcFile.createReader(fs, file.getPath(), conf);
+ OrcFile.createReader(file.getPath(),
+ OrcFile.readerOptions(conf).filesystem(fs));
} catch (IOException e) {
return false;
}
@@ -323,10 +315,6 @@ public class OrcInputFormat implements
return true;
}
- private boolean isVectorMode(Configuration conf) {
- return Utilities.isVectorMode(conf);
- }
-
/**
* Get the list of input {@link Path}s for the map-reduce job.
*
@@ -351,43 +339,13 @@ public class OrcInputFormat implements
* the different worker threads.
*/
static class Context {
- static class FileSplitInfo {
- FileSplitInfo(Path file, long start, long length, String[] hosts,
- FileMetaInfo fileMetaInfo) {
- this.file = file;
- this.start = start;
- this.length = length;
- this.hosts = hosts;
- this.fileMetaInfo = fileMetaInfo;
- }
- Path getPath() {
- return file;
- }
- long getStart() {
- return start;
- }
- long getLength() {
- return length;
- }
- String[] getLocations() {
- return hosts;
- }
- FileMetaInfo getFileMetaInfo() {
- return fileMetaInfo;
- }
- private Path file;
- private long start;
- private long length;
- private String[] hosts;
- FileMetaInfo fileMetaInfo;
- }
private final Configuration conf;
private static Cache<Path, FileInfo> footerCache;
private final ExecutorService threadPool;
- private final List<FileSplitInfo> splits =
- new ArrayList<FileSplitInfo>(10000);
+ private final List<OrcSplit> splits =
+ new ArrayList<OrcSplit>(10000);
+ private final int numBuckets;
private final List<Throwable> errors = new ArrayList<Throwable>();
- private final HadoopShims shims = ShimLoader.getHadoopShims();
private final long maxSize;
private final long minSize;
private final boolean footerInSplits;
@@ -395,6 +353,7 @@ public class OrcInputFormat implements
private final AtomicInteger cacheHitCounter = new AtomicInteger(0);
private final AtomicInteger numFilesCounter = new AtomicInteger(0);
private Throwable fatalError = null;
+ private ValidTxnList transactionList;
/**
* A count of the number of threads that may create more work for the
@@ -406,15 +365,20 @@ public class OrcInputFormat implements
this.conf = conf;
minSize = conf.getLong(MIN_SPLIT_SIZE, DEFAULT_MIN_SPLIT_SIZE);
maxSize = conf.getLong(MAX_SPLIT_SIZE, DEFAULT_MAX_SPLIT_SIZE);
- footerInSplits = HiveConf.getBoolVar(conf, ConfVars.HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS);
+ footerInSplits = HiveConf.getBoolVar(conf,
+ ConfVars.HIVE_ORC_INCLUDE_FILE_FOOTER_IN_SPLITS);
+ numBuckets =
+ Math.max(conf.getInt(hive_metastoreConstants.BUCKET_COUNT, 0), 0);
int cacheStripeDetailsSize = HiveConf.getIntVar(conf,
ConfVars.HIVE_ORC_CACHE_STRIPE_DETAILS_SIZE);
- int numThreads = HiveConf.getIntVar(conf, ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
+ int numThreads = HiveConf.getIntVar(conf,
+ ConfVars.HIVE_ORC_COMPUTE_SPLITS_NUM_THREADS);
cacheStripeDetails = (cacheStripeDetailsSize > 0);
threadPool = Executors.newFixedThreadPool(numThreads,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ORC_GET_SPLITS #%d").build());
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("ORC_GET_SPLITS #%d").build());
synchronized (Context.class) {
if (footerCache == null && cacheStripeDetails) {
@@ -422,6 +386,9 @@ public class OrcInputFormat implements
.initialCapacity(cacheStripeDetailsSize).softValues().build();
}
}
+ String value = conf.get(ValidTxnList.VALID_TXNS_KEY,
+ Long.MAX_VALUE + ":");
+ transactionList = new ValidTxnListImpl(value);
}
int getSchedulers() {
@@ -432,9 +399,9 @@ public class OrcInputFormat implements
* Get the Nth split.
* @param index if index >= 0, count from the front, otherwise count from
* the back.
- * @result the Nth file split
+ * @return the Nth file split
*/
- FileSplitInfo getResult(int index) {
+ OrcSplit getResult(int index) {
if (index >= 0) {
return splits.get(index);
} else {
@@ -452,7 +419,8 @@ public class OrcInputFormat implements
*/
synchronized void schedule(Runnable runnable) {
if (fatalError == null) {
- if (runnable instanceof FileGenerator || runnable instanceof SplitGenerator) {
+ if (runnable instanceof FileGenerator ||
+ runnable instanceof SplitGenerator) {
schedulers += 1;
}
threadPool.execute(runnable);
@@ -513,23 +481,65 @@ public class OrcInputFormat implements
this.dir = dir;
}
+ private void scheduleSplits(FileStatus file,
+ boolean isOriginal,
+ boolean hasBase,
+ List<Long> deltas) throws IOException{
+ FileInfo info = null;
+ if (context.cacheStripeDetails) {
+ info = verifyCachedFileInfo(file);
+ }
+ new SplitGenerator(context, fs, file, info, isOriginal, deltas,
+ hasBase).schedule();
+ }
+
/**
* For each path, get the list of files and blocks that they consist of.
*/
@Override
public void run() {
try {
- Iterator<FileStatus> itr = context.shims.listLocatedStatus(fs, dir,
- hiddenFileFilter);
- while (itr.hasNext()) {
- FileStatus file = itr.next();
- if (!file.isDir()) {
- FileInfo fileInfo = null;
- if (context.cacheStripeDetails) {
- fileInfo = verifyCachedFileInfo(file);
+ AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
+ context.conf, context.transactionList);
+ List<Long> deltas =
+ AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+ Path base = dirInfo.getBaseDirectory();
+ List<FileStatus> original = dirInfo.getOriginalFiles();
+
+ boolean[] covered = new boolean[context.numBuckets];
+ boolean isOriginal = base == null;
+
+ // if we have a base to work from
+ if (base != null || !original.isEmpty()) {
+
+ // find the base files (original or new style)
+ List<FileStatus> children = original;
+ if (base != null) {
+ children = SHIMS.listLocatedStatus(fs, base,
+ AcidUtils.hiddenFileFilter);
+ }
+
+ // for each child, schedule splits and mark off the bucket
+ for(FileStatus child: children) {
+ AcidOutputFormat.Options opts = AcidUtils.parseBaseBucketFilename
+ (child.getPath(), context.conf);
+ scheduleSplits(child, isOriginal, true, deltas);
+ int b = opts.getBucket();
+ // If the bucket is in the valid range, mark it as covered.
+ // I wish Hive actually enforced bucketing all of the time.
+ if (b >= 0 && b < covered.length) {
+ covered[b] = true;
}
- SplitGenerator spgen = new SplitGenerator(context, fs, file, fileInfo);
- spgen.schedule();
+ }
+ }
+
+ // Generate a split for any buckets that weren't covered.
+ // This happens in the case where a bucket just has deltas and no
+ // base.
+ for(int b=0; b < context.numBuckets; ++b) {
+ if (!covered[b]) {
+ context.splits.add(new OrcSplit(dir, b, 0, new String[0], null,
+ false, false, deltas));
}
}
} catch (Throwable th) {
@@ -554,7 +564,8 @@ public class OrcInputFormat implements
if (LOG.isDebugEnabled()) {
LOG.debug("Info cached for path: " + file.getPath());
}
- if (fileInfo.modificationTime == file.getModificationTime() && fileInfo.size == file.getLen()) {
+ if (fileInfo.modificationTime == file.getModificationTime() &&
+ fileInfo.size == file.getLen()) {
// Cached copy is valid
context.cacheHitCounter.incrementAndGet();
return fileInfo;
@@ -562,10 +573,12 @@ public class OrcInputFormat implements
// Invalidate
Context.footerCache.invalidate(file.getPath());
if (LOG.isDebugEnabled()) {
- LOG.debug("Meta-Info for : " + file.getPath() + " changed. CachedModificationTime: "
+ LOG.debug("Meta-Info for : " + file.getPath() +
+ " changed. CachedModificationTime: "
+ fileInfo.modificationTime + ", CurrentModificationTime: "
+ file.getModificationTime()
- + ", CachedLength: " + fileInfo.size + ", CurrentLength: " + file.getLen());
+ + ", CachedLength: " + fileInfo.size + ", CurrentLength: " +
+ file.getLen());
}
}
} else {
@@ -588,20 +601,28 @@ public class OrcInputFormat implements
private final long blockSize;
private final BlockLocation[] locations;
private final FileInfo fileInfo;
- private Iterable<StripeInformation> stripes;
- private FileMetaInfo fileMetaInfo;
+ private List<StripeInformation> stripes;
+ private ReaderImpl.FileMetaInfo fileMetaInfo;
private Metadata metadata;
private List<OrcProto.Type> types;
-
+ private final boolean isOriginal;
+ private final List<Long> deltas;
+ private final boolean hasBase;
SplitGenerator(Context context, FileSystem fs,
- FileStatus file, FileInfo fileInfo) throws IOException {
+ FileStatus file, FileInfo fileInfo,
+ boolean isOriginal,
+ List<Long> deltas,
+ boolean hasBase) throws IOException {
this.context = context;
this.fs = fs;
this.file = file;
this.blockSize = file.getBlockSize();
this.fileInfo = fileInfo;
- locations = context.shims.getLocations(fs, file);
+ locations = SHIMS.getLocations(fs, file);
+ this.isOriginal = isOriginal;
+ this.deltas = deltas;
+ this.hasBase = hasBase;
}
Path getPath() {
@@ -612,8 +633,8 @@ public class OrcInputFormat implements
if(locations.length == 1 && file.getLen() < context.maxSize) {
String[] hosts = locations[0].getHosts();
synchronized (context.splits) {
- context.splits.add(new Context.FileSplitInfo(file.getPath(), 0,
- file.getLen(), hosts, fileMetaInfo));
+ context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(),
+ hosts, fileMetaInfo, isOriginal, hasBase, deltas));
}
} else {
// if it requires a compute task
@@ -655,7 +676,8 @@ public class OrcInputFormat implements
* @param fileMetaInfo file metadata from footer and postscript
* @throws IOException
*/
- void createSplit(long offset, long length, FileMetaInfo fileMetaInfo) throws IOException {
+ void createSplit(long offset, long length,
+ ReaderImpl.FileMetaInfo fileMetaInfo) throws IOException {
String[] hosts;
if ((offset % blockSize) + length <= blockSize) {
// handle the single block case
@@ -699,8 +721,8 @@ public class OrcInputFormat implements
hostList.toArray(hosts);
}
synchronized (context.splits) {
- context.splits.add(new Context.FileSplitInfo(file.getPath(), offset,
- length, hosts, fileMetaInfo));
+ context.splits.add(new OrcSplit(file.getPath(), offset, length,
+ hosts, fileMetaInfo, isOriginal, hasBase, deltas));
}
}
@@ -712,30 +734,43 @@ public class OrcInputFormat implements
public void run() {
try {
populateAndCacheStripeDetails();
- Configuration conf = context.conf;
- SearchArgument sarg = createSarg(types, conf);
- List<StripeStatistics> stripeStats = null;
- int[] filterColumns = null;
- if (sarg != null) {
- List<PredicateLeaf> sargLeaves = null;
- String[] allColumns = conf.get(serdeConstants.LIST_COLUMNS).split(",");
- String[] neededColumns = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR).split(",");
- sargLeaves = sarg.getLeaves();
- filterColumns = new int[sargLeaves.size()];
- for (int i = 0; i < filterColumns.length; ++i) {
- String colName = sargLeaves.get(i).getColumnName();
-
- // if needed columns does not contain the column specified in filter expression then
- // it must be partition column. There will not be columns within ORC file for partitioned
- // column, so we can ignore them
- if (containsColumn(neededColumns, colName)) {
- filterColumns[i] = RecordReaderImpl.findColumns(allColumns, colName);
- } else {
- filterColumns[i] = -1;
+
+ // figure out which stripes we need to read
+ boolean[] includeStripe = null;
+ // we can't eliminate stripes if there are deltas because the
+ // deltas may change the rows making them match the predicate.
+ if (deltas.isEmpty()) {
+ Reader.Options options = new Reader.Options();
+ setIncludedColumns(options, types, context.conf, isOriginal);
+ setSearchArgument(options, types, context.conf, isOriginal);
+ if (options.getSearchArgument() != null) {
+ SearchArgument sarg = options.getSearchArgument();
+ List<PredicateLeaf> sargLeaves = sarg.getLeaves();
+ List<StripeStatistics> stripeStats = metadata.getStripeStatistics();
+ int[] filterColumns = RecordReaderImpl.mapSargColumns(sargLeaves,
+ options.getColumnNames(), getRootColumn(isOriginal));
+
+ if (stripeStats != null) {
+ // eliminate stripes that doesn't satisfy the predicate condition
+ includeStripe = new boolean[stripes.size()];
+ for(int i=0; i < stripes.size(); ++i) {
+ includeStripe[i] = (i > stripeStats.size()) ||
+ isStripeSatisfyPredicate(stripeStats.get(i), sarg,
+ filterColumns);
+ if (LOG.isDebugEnabled() && !includeStripe[i]) {
+ LOG.debug("Eliminating ORC stripe-" + i + " of file '" +
+ file.getPath() + "' as it did not satisfy " +
+ "predicate condition.");
+ }
+ }
}
}
+ }
- stripeStats = metadata.getStripeStatistics();
+ // if we didn't have predicate pushdown, read everything
+ if (includeStripe == null) {
+ includeStripe = new boolean[stripes.size()];
+ Arrays.fill(includeStripe, true);
}
long currentOffset = -1;
@@ -744,18 +779,7 @@ public class OrcInputFormat implements
for(StripeInformation stripe: stripes) {
idx++;
- // eliminate stripes that doesn't satisfy the predicate condition
- if (sarg != null &&
- stripeStats != null &&
- idx < stripeStats.size() &&
- !isStripeSatisfyPredicate(stripeStats.get(idx), sarg, filterColumns)) {
-
- // if a stripe doesn't satisfy predicate condition then skip it
- if (LOG.isDebugEnabled()) {
- LOG.debug("Eliminating ORC stripe-" + idx + " of file '" + file.getPath()
- + "' as it did not satisfy predicate condition.");
- }
-
+ if (!includeStripe[idx]) {
// create split for the previous unfinished stripe
if (currentOffset != -1) {
createSplit(currentOffset, currentLength, fileMetaInfo);
@@ -776,7 +800,8 @@ public class OrcInputFormat implements
currentOffset = stripe.getOffset();
currentLength = stripe.getLength();
} else {
- currentLength = (stripe.getOffset() + stripe.getLength()) - currentOffset;
+ currentLength =
+ (stripe.getOffset() + stripe.getLength()) - currentOffset;
}
if (currentLength >= context.maxSize) {
createSplit(currentOffset, currentLength, fileMetaInfo);
@@ -804,32 +829,32 @@ public class OrcInputFormat implements
private void populateAndCacheStripeDetails() {
try {
Reader orcReader;
- boolean found = false;
if (fileInfo != null) {
- found = true;
stripes = fileInfo.stripeInfos;
fileMetaInfo = fileInfo.fileMetaInfo;
metadata = fileInfo.metadata;
types = fileInfo.types;
// For multiple runs, in case sendSplitsInFooter changes
if (fileMetaInfo == null && context.footerInSplits) {
- orcReader = OrcFile.createReader(fs, file.getPath(), context.conf);
- fileInfo.fileMetaInfo = orcReader.getFileMetaInfo();
+ orcReader = OrcFile.createReader(file.getPath(),
+ OrcFile.readerOptions(context.conf).filesystem(fs));
+ fileInfo.fileMetaInfo = ((ReaderImpl) orcReader).getFileMetaInfo();
fileInfo.metadata = orcReader.getMetadata();
fileInfo.types = orcReader.getTypes();
}
- }
- if (!found) {
- orcReader = OrcFile.createReader(fs, file.getPath(), context.conf);
+ } else {
+ orcReader = OrcFile.createReader(file.getPath(),
+ OrcFile.readerOptions(context.conf).filesystem(fs));
stripes = orcReader.getStripes();
metadata = orcReader.getMetadata();
types = orcReader.getTypes();
- fileMetaInfo = context.footerInSplits ? orcReader.getFileMetaInfo() : null;
+ fileMetaInfo = context.footerInSplits ?
+ ((ReaderImpl) orcReader).getFileMetaInfo() : null;
if (context.cacheStripeDetails) {
// Populate into cache.
Context.footerCache.put(file.getPath(),
- new FileInfo(file.getModificationTime(), file.getLen(), stripes, metadata,
- types, fileMetaInfo));
+ new FileInfo(file.getModificationTime(), file.getLen(), stripes,
+ metadata, types, fileMetaInfo));
}
}
} catch (Throwable th) {
@@ -845,45 +870,35 @@ public class OrcInputFormat implements
}
}
- private boolean containsColumn(String[] neededColumns, String colName) {
- for (String col : neededColumns) {
- if (colName.equalsIgnoreCase(col)) {
- return true;
- }
- }
- return false;
- }
-
private boolean isStripeSatisfyPredicate(StripeStatistics stripeStatistics,
- SearchArgument sarg, int[] filterColumns) {
- if (sarg != null && filterColumns != null) {
- List<PredicateLeaf> predLeaves = sarg.getLeaves();
- TruthValue[] truthValues = new TruthValue[predLeaves.size()];
- for (int pred = 0; pred < truthValues.length; pred++) {
- if (filterColumns[pred] != -1) {
-
- // column statistics at index 0 contains only the number of rows
- ColumnStatistics stats = stripeStatistics.getColumnStatistics()[filterColumns[pred] + 1];
- Object minValue = RecordReaderImpl.getMin(stats);
- Object maxValue = RecordReaderImpl.getMax(stats);
- PredicateLeaf predLeaf = predLeaves.get(pred);
- truthValues[pred] = RecordReaderImpl.evaluatePredicateRange(predLeaf, minValue, maxValue);
- } else {
+ SearchArgument sarg,
+ int[] filterColumns) {
+ List<PredicateLeaf> predLeaves = sarg.getLeaves();
+ TruthValue[] truthValues = new TruthValue[predLeaves.size()];
+ for (int pred = 0; pred < truthValues.length; pred++) {
+ if (filterColumns[pred] != -1) {
+
+ // column statistics at index 0 contains only the number of rows
+ ColumnStatistics stats =
+ stripeStatistics.getColumnStatistics()[filterColumns[pred]];
+ Object minValue = RecordReaderImpl.getMin(stats);
+ Object maxValue = RecordReaderImpl.getMax(stats);
+ truthValues[pred] =
+ RecordReaderImpl.evaluatePredicateRange(predLeaves.get(pred),
+ minValue, maxValue);
+ } else {
- // parition column case.
- // partition filter will be evaluated by partition pruner so
- // we will not evaluate partition filter here.
- truthValues[pred] = TruthValue.YES_NO_NULL;
- }
+ // parition column case.
+ // partition filter will be evaluated by partition pruner so
+ // we will not evaluate partition filter here.
+ truthValues[pred] = TruthValue.YES_NO_NULL;
}
- return sarg.evaluate(truthValues).isNeeded();
}
- return true;
+ return sarg.evaluate(truthValues).isNeeded();
}
-
}
- static List<Context.FileSplitInfo> generateSplitsInfo(Configuration conf)
+ static List<OrcSplit> generateSplitsInfo(Configuration conf)
throws IOException {
// use threads to resolve directories into splits
Context context = new Context(conf);
@@ -911,20 +926,14 @@ public class OrcInputFormat implements
}
return context.splits;
}
+
@Override
public InputSplit[] getSplits(JobConf job,
int numSplits) throws IOException {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
- List<OrcInputFormat.Context.FileSplitInfo> splits =
- OrcInputFormat.generateSplitsInfo(job);
- InputSplit[] result = new InputSplit[splits.size()];
- for (int i=0;i<splits.size();i++) {
- OrcInputFormat.Context.FileSplitInfo split = splits.get(i);
- result[i] = new OrcSplit(split.getPath(), split.getStart(),
- split.getLength(), split.getLocations(), split.getFileMetaInfo());
- }
+ List<OrcSplit> result = generateSplitsInfo(job);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
- return result;
+ return result.toArray(new InputSplit[result.size()]);
}
/**
@@ -936,14 +945,16 @@ public class OrcInputFormat implements
private static class FileInfo {
long modificationTime;
long size;
- Iterable<StripeInformation> stripeInfos;
- FileMetaInfo fileMetaInfo;
+ List<StripeInformation> stripeInfos;
+ ReaderImpl.FileMetaInfo fileMetaInfo;
Metadata metadata;
List<OrcProto.Type> types;
- FileInfo(long modificationTime, long size, Iterable<StripeInformation> stripeInfos,
- Metadata metadata, List<OrcProto.Type> types, FileMetaInfo fileMetaInfo) {
+ FileInfo(long modificationTime, long size,
+ List<StripeInformation> stripeInfos,
+ Metadata metadata, List<OrcProto.Type> types,
+ ReaderImpl.FileMetaInfo fileMetaInfo) {
this.modificationTime = modificationTime;
this.size = size;
this.stripeInfos = stripeInfos;
@@ -952,4 +963,219 @@ public class OrcInputFormat implements
this.types = types;
}
}
+
+ @SuppressWarnings("unchecked")
+ private org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
+ createVectorizedReader(InputSplit split, JobConf conf, Reporter reporter
+ ) throws IOException {
+ return (org.apache.hadoop.mapred.RecordReader)
+ new VectorizedOrcInputFormat().getRecordReader(split, conf, reporter);
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>
+ getRecordReader(InputSplit inputSplit, JobConf conf,
+ Reporter reporter) throws IOException {
+ boolean vectorMode = Utilities.isVectorMode(conf);
+
+ // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits,
+ // we know it is not ACID.
+ if (inputSplit.getClass() == FileSplit.class) {
+ if (vectorMode) {
+ return createVectorizedReader(inputSplit, conf, reporter);
+ }
+ return new OrcRecordReader(OrcFile.createReader(
+ ((FileSplit) inputSplit).getPath(),
+ OrcFile.readerOptions(conf)), conf, (FileSplit) inputSplit);
+ }
+
+ OrcSplit split = (OrcSplit) inputSplit;
+ // TODO vectorized reader doesn't work with the new format yet
+ if (vectorMode) {
+ if (!split.getDeltas().isEmpty() || !split.isOriginal()) {
+ throw new IOException("Vectorization and ACID tables are incompatible."
+ );
+ }
+ return createVectorizedReader(inputSplit, conf, reporter);
+ }
+ reporter.setStatus(inputSplit.toString());
+
+ // if we are strictly old-school, just use the old code
+ if (split.isOriginal() && split.getDeltas().isEmpty()) {
+ return new OrcRecordReader(OrcFile.createReader(split.getPath(),
+ OrcFile.readerOptions(conf)), conf, split);
+ }
+
+ Options options = new Options(conf).reporter(reporter);
+ final RowReader<OrcStruct> inner = getReader(inputSplit, options);
+ final RecordIdentifier id = inner.createKey();
+
+ // Return a RecordReader that is compatible with the Hive 0.12 reader
+ // with NullWritable for the key instead of RecordIdentifier.
+ return new org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct>(){
+ @Override
+ public boolean next(NullWritable nullWritable,
+ OrcStruct orcStruct) throws IOException {
+ return inner.next(id, orcStruct);
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public OrcStruct createValue() {
+ return inner.createValue();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return inner.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ inner.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return inner.getProgress();
+ }
+ };
+ }
+
+
+ @Override
+ public RowReader<OrcStruct> getReader(InputSplit inputSplit,
+ Options options) throws IOException {
+ final OrcSplit split = (OrcSplit) inputSplit;
+ final Path path = split.getPath();
+ Path root;
+ if (split.hasBase()) {
+ if (split.isOriginal()) {
+ root = path.getParent();
+ } else {
+ root = path.getParent().getParent();
+ }
+ } else {
+ root = path;
+ }
+ final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas());
+ final Configuration conf = options.getConfiguration();
+ final Reader reader;
+ final int bucket;
+ Reader.Options readOptions = new Reader.Options();
+ readOptions.range(split.getStart(), split.getLength());
+ if (split.hasBase()) {
+ bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf)
+ .getBucket();
+ reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ final List<OrcProto.Type> types = reader.getTypes();
+ setIncludedColumns(readOptions, types, conf, split.isOriginal());
+ setSearchArgument(readOptions, types, conf, split.isOriginal());
+ } else {
+ bucket = (int) split.getStart();
+ reader = null;
+ }
+ String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
+ Long.MAX_VALUE + ":");
+ ValidTxnList validTxnList = new ValidTxnListImpl(txnString);
+ final OrcRawRecordMerger records =
+ new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
+ validTxnList, readOptions, deltas);
+ return new RowReader<OrcStruct>() {
+ OrcStruct innerRecord = records.createValue();
+
+ @Override
+ public ObjectInspector getObjectInspector() {
+ return ((StructObjectInspector) reader.getObjectInspector())
+ .getAllStructFieldRefs().get(OrcRecordUpdater.ROW)
+ .getFieldObjectInspector();
+ }
+
+ @Override
+ public boolean next(RecordIdentifier recordIdentifier,
+ OrcStruct orcStruct) throws IOException {
+ boolean result;
+ // filter out the deleted records
+ do {
+ result = records.next(recordIdentifier, innerRecord);
+ } while (result &&
+ OrcRecordUpdater.getOperation(innerRecord) ==
+ OrcRecordUpdater.DELETE_OPERATION);
+ if (result) {
+ // swap the fields with the passed in orcStruct
+ orcStruct.linkFields(OrcRecordUpdater.getRow(innerRecord));
+ }
+ return result;
+ }
+
+ @Override
+ public RecordIdentifier createKey() {
+ return records.createKey();
+ }
+
+ @Override
+ public OrcStruct createValue() {
+ return new OrcStruct(records.getColumns());
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return records.getPos();
+ }
+
+ @Override
+ public void close() throws IOException {
+ records.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return records.getProgress();
+ }
+ };
+ }
+
+ static Path findOriginalBucket(FileSystem fs,
+ Path directory,
+ int bucket) throws IOException {
+ for(FileStatus stat: fs.listStatus(directory)) {
+ String name = stat.getPath().getName();
+ if (Integer.parseInt(name.substring(0, name.indexOf('_'))) == bucket) {
+ return stat.getPath();
+ }
+ }
+ throw new IllegalArgumentException("Can't find bucket " + bucket + " in " +
+ directory);
+ }
+
+ @Override
+ public RawReader<OrcStruct> getRawReader(Configuration conf,
+ boolean collapseEvents,
+ int bucket,
+ ValidTxnList validTxnList,
+ Path baseDirectory,
+ Path[] deltaDirectory
+ ) throws IOException {
+ Reader reader = null;
+ boolean isOriginal = false;
+ if (baseDirectory != null) {
+ Path bucketFile;
+ if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
+ bucketFile = AcidUtils.createBucketFile(baseDirectory, bucket);
+ } else {
+ isOriginal = true;
+ bucketFile = findOriginalBucket(baseDirectory.getFileSystem(conf),
+ baseDirectory, bucket);
+ }
+ reader = OrcFile.createReader(bucketFile, OrcFile.readerOptions(conf));
+ }
+ return new OrcRawRecordMerger(conf, collapseEvents, reader, isOriginal,
+ bucket, validTxnList, new Reader.Options(), deltaDirectory);
+ }
+
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java Wed Mar 26 18:14:37 2014
@@ -48,8 +48,8 @@ public class OrcNewInputFormat extends I
Path path = fileSplit.getPath();
Configuration conf = ShimLoader.getHadoopShims()
.getConfiguration(context);
- FileSystem fs = path.getFileSystem(conf);
- return new OrcRecordReader(OrcFile.createReader(fs, path, conf),
+ return new OrcRecordReader(OrcFile.createReader(path,
+ OrcFile.readerOptions(conf)),
ShimLoader.getHadoopShims().getConfiguration(context),
fileSplit.getStart(), fileSplit.getLength());
}
@@ -118,15 +118,14 @@ public class OrcNewInputFormat extends I
public List<InputSplit> getSplits(JobContext jobContext)
throws IOException, InterruptedException {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
- List<OrcInputFormat.Context.FileSplitInfo> splits =
+ Configuration conf =
+ ShimLoader.getHadoopShims().getConfiguration(jobContext);
+ List<OrcSplit> splits =
OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims()
.getConfiguration(jobContext));
List<InputSplit> result = new ArrayList<InputSplit>();
- for (OrcInputFormat.Context.FileSplitInfo split : splits) {
- FileSplit newSplit = new OrcNewSplit(split.getPath(),
- split.getStart(), split.getLength(), split.getLocations(),
- split.getFileMetaInfo());
- result.add(newSplit);
+ for(OrcSplit split: OrcInputFormat.generateSplitsInfo(conf)) {
+ result.add(new OrcNewSplit(split));
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
return result;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java Wed Mar 26 18:14:37 2014
@@ -21,9 +21,9 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.orc.Reader.FileMetaInfo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -33,31 +33,42 @@ import org.apache.hadoop.mapreduce.lib.i
*
*/
public class OrcNewSplit extends FileSplit {
- private Reader.FileMetaInfo fileMetaInfo;
+ private ReaderImpl.FileMetaInfo fileMetaInfo;
private boolean hasFooter;
-
+ private boolean isOriginal;
+ private boolean hasBase;
+ private final List<Long> deltas = new ArrayList<Long>();
+
protected OrcNewSplit(){
//The FileSplit() constructor in hadoop 0.20 and 1.x is package private so can't use it.
//This constructor is used to create the object and then call readFields()
// so just pass nulls to this super constructor.
- super(null, 0, 0, (String[])null);
+ super(null, 0, 0, null);
}
- public OrcNewSplit(Path path, long offset, long length, String[] hosts,
- FileMetaInfo fileMetaInfo) {
- super(path, offset, length, hosts);
- this.fileMetaInfo = fileMetaInfo;
- hasFooter = this.fileMetaInfo != null;
+ public OrcNewSplit(OrcSplit inner) throws IOException {
+ super(inner.getPath(), inner.getStart(), inner.getLength(),
+ inner.getLocations());
+ this.fileMetaInfo = inner.getFileMetaInfo();
+ this.hasFooter = inner.hasFooter();
+ this.isOriginal = inner.isOriginal();
+ this.hasBase = inner.hasBase();
+ this.deltas.addAll(inner.getDeltas());
}
-
+
@Override
public void write(DataOutput out) throws IOException {
//serialize path, offset, length using FileSplit
super.write(out);
- // Whether footer information follows.
- out.writeBoolean(hasFooter);
-
+ int flags = (hasBase ? OrcSplit.BASE_FLAG : 0) |
+ (isOriginal ? OrcSplit.ORIGINAL_FLAG : 0) |
+ (hasFooter ? OrcSplit.FOOTER_FLAG : 0);
+ out.writeByte(flags);
+ out.writeInt(deltas.size());
+ for(Long delta: deltas) {
+ out.writeLong(delta);
+ }
if (hasFooter) {
// serialize FileMetaInfo fields
Text.writeString(out, fileMetaInfo.compressionType);
@@ -74,14 +85,22 @@ public class OrcNewSplit extends FileSpl
footerBuff.limit() - footerBuff.position());
}
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
//deserialize path, offset, length using FileSplit
super.readFields(in);
- hasFooter = in.readBoolean();
-
+ byte flags = in.readByte();
+ hasFooter = (OrcSplit.FOOTER_FLAG & flags) != 0;
+ isOriginal = (OrcSplit.ORIGINAL_FLAG & flags) != 0;
+ hasBase = (OrcSplit.BASE_FLAG & flags) != 0;
+
+ deltas.clear();
+ int numDeltas = in.readInt();
+ for(int i=0; i < numDeltas; i++) {
+ deltas.add(in.readLong());
+ }
if (hasFooter) {
// deserialize FileMetaInfo fields
String compressionType = Text.readString(in);
@@ -93,15 +112,28 @@ public class OrcNewSplit extends FileSpl
ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
in.readFully(footerBuff.array(), 0, footerBuffSize);
- fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, metadataSize, footerBuff);
+ fileMetaInfo = new ReaderImpl.FileMetaInfo(compressionType, bufferSize,
+ metadataSize, footerBuff);
}
}
- public FileMetaInfo getFileMetaInfo(){
+ ReaderImpl.FileMetaInfo getFileMetaInfo(){
return fileMetaInfo;
}
public boolean hasFooter() {
return hasFooter;
}
+
+ public boolean isOriginal() {
+ return isOriginal;
+ }
+
+ public boolean hasBase() {
+ return hasBase;
+ }
+
+ public List<Long> getDeltas() {
+ return deltas;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1581977&r1=1581976&r2=1581977&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Wed Mar 26 18:14:37 2014
@@ -19,12 +19,19 @@ package org.apache.hadoop.hive.ql.io.orc
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.FSRecordWriter;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileOutputFormat;
@@ -34,6 +41,7 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Properties;
@@ -41,7 +49,7 @@ import java.util.Properties;
* A Hive OutputFormat for ORC files.
*/
public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
- implements HiveOutputFormat<NullWritable, OrcSerdeRow> {
+ implements AcidOutputFormat<OrcSerdeRow> {
private static class OrcRecordWriter
implements RecordWriter<NullWritable, OrcSerdeRow>,
@@ -179,4 +187,135 @@ public class OrcOutputFormat extends Fil
Progressable reporter) throws IOException {
return new OrcRecordWriter(path, getOptions(conf,tableProperties));
}
+
+ private class DummyOrcRecordUpdater implements RecordUpdater {
+ private final Path path;
+ private final ObjectInspector inspector;
+ private final PrintStream out;
+
+ private DummyOrcRecordUpdater(Path path, Options options) {
+ this.path = path;
+ this.inspector = options.getInspector();
+ this.out = options.getDummyStream();
+ }
+
+ @Override
+ public void insert(long currentTransaction, Object row) throws IOException {
+ out.println("insert " + path + " currTxn: " + currentTransaction +
+ " obj: " + stringifyObject(row, inspector));
+ }
+
+ @Override
+ public void update(long currentTransaction, long originalTransaction,
+ long rowId, Object row) throws IOException {
+ out.println("update " + path + " currTxn: " + currentTransaction +
+ " origTxn: " + originalTransaction + " row: " + rowId + " obj: " +
+ stringifyObject(row, inspector));
+ }
+
+ @Override
+ public void delete(long currentTransaction, long originalTransaction,
+ long rowId) throws IOException {
+ out.println("delete " + path + " currTxn: " + currentTransaction +
+ " origTxn: " + originalTransaction + " row: " + rowId);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.println("flush " + path);
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ out.println("close " + path);
+ }
+
+ @Override
+ public SerDeStats getStats() {
+ return null;
+ }
+
+ private void stringifyObject(StringBuilder buffer,
+ Object obj,
+ ObjectInspector inspector
+ ) throws IOException {
+ if (inspector instanceof StructObjectInspector) {
+ buffer.append("{ ");
+ StructObjectInspector soi = (StructObjectInspector) inspector;
+ boolean isFirst = true;
+ for(StructField field: soi.getAllStructFieldRefs()) {
+ if (isFirst) {
+ isFirst = false;
+ } else {
+ buffer.append(", ");
+ }
+ buffer.append(field.getFieldName());
+ buffer.append(": ");
+ stringifyObject(buffer, soi.getStructFieldData(obj, field),
+ field.getFieldObjectInspector());
+ }
+ buffer.append(" }");
+ } else if (inspector instanceof PrimitiveObjectInspector) {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inspector;
+ buffer.append(poi.getPrimitiveJavaObject(obj).toString());
+ } else {
+ buffer.append("*unknown*");
+ }
+ }
+
+ private String stringifyObject(Object obj,
+ ObjectInspector inspector
+ ) throws IOException {
+ StringBuilder buffer = new StringBuilder();
+ stringifyObject(buffer, obj, inspector);
+ return buffer.toString();
+ }
+ }
+
+ @Override
+ public RecordUpdater getRecordUpdater(Path path,
+ Options options) throws IOException {
+ if (options.getDummyStream() != null) {
+ return new DummyOrcRecordUpdater(path, options);
+ } else {
+ return new OrcRecordUpdater(path, options);
+ }
+ }
+
+ @Override
+ public FSRecordWriter getRawRecordWriter(Path path,
+ Options options) throws IOException {
+ final Path filename = AcidUtils.createFilename(path, options);
+ final OrcFile.WriterOptions opts =
+ OrcFile.writerOptions(options.getConfiguration());
+ if (!options.isWritingBase()) {
+ opts.bufferSize(OrcRecordUpdater.DELTA_BUFFER_SIZE)
+ .stripeSize(OrcRecordUpdater.DELTA_STRIPE_SIZE)
+ .blockPadding(false)
+ .compress(CompressionKind.NONE)
+ .rowIndexStride(0);
+ }
+ final OrcRecordUpdater.KeyIndexBuilder watcher =
+ new OrcRecordUpdater.KeyIndexBuilder();
+ opts.inspector(options.getInspector())
+ .callback(watcher);
+ final Writer writer = OrcFile.createWriter(filename, opts);
+ return new FSRecordWriter() {
+ @Override
+ public void write(Writable w) throws IOException {
+ OrcStruct orc = (OrcStruct) w;
+ watcher.addKey(
+ ((LongWritable)
+ orc.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION)).get(),
+ ((IntWritable) orc.getFieldValue(OrcRecordUpdater.BUCKET)).get(),
+ ((LongWritable) orc.getFieldValue(OrcRecordUpdater.ROW_ID)).get());
+ writer.addRow(w);
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ writer.close();
+ }
+ };
+ }
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1581977&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java Wed Mar 26 18:14:37 2014
@@ -0,0 +1,661 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Merges a base and a list of delta files together into a single stream of
+ * events.
+ */
+public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
+
+ private static final Log LOG = LogFactory.getLog(OrcRawRecordMerger.class);
+
+ private final Configuration conf;
+ private final boolean collapse;
+ private final RecordReader baseReader;
+ private final long offset;
+ private final long length;
+ private final ValidTxnList validTxnList;
+ private final int columns;
+ private ReaderKey prevKey = new ReaderKey();
+ // this is the key less than the lowest key we need to process
+ private RecordIdentifier minKey;
+ // this is the last key we need to process
+ private RecordIdentifier maxKey;
+ // an extra value so that we can return it while reading ahead
+ private OrcStruct extraValue;
+
+ /**
+ * A RecordIdentifier extended with the current transaction id. This is the
+ * key of our merge sort with the originalTransaction, bucket, and rowId
+ * ascending and the currentTransaction descending. This means that if the
+ * reader is collapsing events to just the last update, just the first
+ * instance of each record is required.
+ */
+ final static class ReaderKey extends RecordIdentifier{
+ private long currentTransactionId;
+
+ public ReaderKey() {
+ this(-1, -1, -1, -1);
+ }
+
+ public ReaderKey(long originalTransaction, int bucket, long rowId,
+ long currentTransactionId) {
+ super(originalTransaction, bucket, rowId);
+ this.currentTransactionId = currentTransactionId;
+ }
+
+ @Override
+ public void set(RecordIdentifier other) {
+ super.set(other);
+ currentTransactionId = ((ReaderKey) other).currentTransactionId;
+ }
+
+ public void setValues(long originalTransactionId,
+ int bucket,
+ long rowId,
+ long currentTransactionId) {
+ setValues(originalTransactionId, bucket, rowId);
+ this.currentTransactionId = currentTransactionId;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return super.equals(other) &&
+ currentTransactionId == ((ReaderKey) other).currentTransactionId;
+ }
+
+ @Override
+ public int compareTo(RecordIdentifier other) {
+ int sup = compareToInternal(other);
+ if (sup == 0) {
+ if (other.getClass() == ReaderKey.class) {
+ ReaderKey oth = (ReaderKey) other;
+ if (currentTransactionId != oth.currentTransactionId) {
+ return currentTransactionId < oth.currentTransactionId ? +1 : -1;
+ }
+ } else {
+ return -1;
+ }
+ }
+ return sup;
+ }
+
+ public long getCurrentTransactionId() {
+ return currentTransactionId;
+ }
+
+ /**
+ * Compare rows without considering the currentTransactionId.
+ * @param other the value to compare to
+ * @return -1, 0, +1
+ */
+ public int compareRow(RecordIdentifier other) {
+ return compareToInternal(other);
+ }
+
+ @Override
+ public String toString() {
+ return "{originalTxn: " + getTransactionId() + ", bucket: " +
+ getBucketId() + ", row: " + getRowId() + ", currentTxn: " +
+ currentTransactionId + "}";
+ }
+ }
+
+ /**
+ * A reader and the next record from that reader. The code reads ahead so that
+ * we can return the lowest ReaderKey from each of the readers. Thus, the
+ * next available row is nextRecord and only following records are still in
+ * the reader.
+ */
+ static class ReaderPair {
+ OrcStruct nextRecord;
+ final Reader reader;
+ final RecordReader recordReader;
+ final ReaderKey key;
+ final RecordIdentifier maxKey;
+ final int bucket;
+
+ /**
+ * Create a reader that reads from the first key larger than minKey to any
+ * keys equal to maxKey.
+ * @param key the key to read into
+ * @param reader the ORC file reader
+ * @param bucket the bucket number for the file
+ * @param minKey only return keys larger than minKey if it is non-null
+ * @param maxKey only return keys less than or equal to maxKey if it is
+ * non-null
+ * @param options options to provide to read the rows.
+ * @throws IOException
+ */
+ ReaderPair(ReaderKey key, Reader reader, int bucket,
+ RecordIdentifier minKey, RecordIdentifier maxKey,
+ ReaderImpl.Options options) throws IOException {
+ this.reader = reader;
+ this.key = key;
+ this.maxKey = maxKey;
+ this.bucket = bucket;
+ // TODO use stripe statistics to jump over stripes
+ recordReader = reader.rowsOptions(options);
+ // advance the reader until we reach the minimum key
+ do {
+ next(nextRecord);
+ } while (nextRecord != null &&
+ (minKey != null && key.compareRow(minKey) <= 0));
+ }
+
+ void next(OrcStruct next) throws IOException {
+ if (recordReader.hasNext()) {
+ nextRecord = (OrcStruct) recordReader.next(next);
+ // set the key
+ key.setValues(OrcRecordUpdater.getOriginalTransaction(nextRecord),
+ OrcRecordUpdater.getBucket(nextRecord),
+ OrcRecordUpdater.getRowId(nextRecord),
+ OrcRecordUpdater.getCurrentTransaction(nextRecord));
+
+ // if this record is larger than maxKey, we need to stop
+ if (maxKey != null && key.compareRow(maxKey) > 0) {
+ LOG.debug("key " + key + " > maxkey " + maxKey);
+ nextRecord = null;
+ recordReader.close();
+ }
+ } else {
+ nextRecord = null;
+ recordReader.close();
+ }
+ }
+
+ int getColumns() {
+ return reader.getTypes().get(OrcRecordUpdater.ROW + 1).getSubtypesCount();
+ }
+ }
+
+ /**
+ * A reader that pretends an original base file is a new version base file.
+ * It wraps the underlying reader's row with an ACID event object and
+ * makes the relevant translations.
+ */
+ static final class OriginalReaderPair extends ReaderPair {
+ OriginalReaderPair(ReaderKey key, Reader reader, int bucket,
+ RecordIdentifier minKey, RecordIdentifier maxKey,
+ Reader.Options options) throws IOException {
+ super(key, reader, bucket, minKey, maxKey, options);
+ }
+
+ @Override
+ void next(OrcStruct next) throws IOException {
+ if (recordReader.hasNext()) {
+ long nextRowId = recordReader.getRowNumber();
+ // have to do initialization here, because the super's constructor
+ // calls next and thus we need to initialize before our constructor
+ // runs
+ if (next == null) {
+ nextRecord = new OrcStruct(OrcRecordUpdater.FIELDS);
+ IntWritable operation =
+ new IntWritable(OrcRecordUpdater.INSERT_OPERATION);
+ nextRecord.setFieldValue(OrcRecordUpdater.OPERATION, operation);
+ nextRecord.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION,
+ new LongWritable(0));
+ nextRecord.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION,
+ new LongWritable(0));
+ nextRecord.setFieldValue(OrcRecordUpdater.BUCKET,
+ new IntWritable(bucket));
+ nextRecord.setFieldValue(OrcRecordUpdater.ROW_ID,
+ new LongWritable(nextRowId));
+ nextRecord.setFieldValue(OrcRecordUpdater.ROW,
+ recordReader.next(null));
+ } else {
+ nextRecord = next;
+ ((IntWritable) next.getFieldValue(OrcRecordUpdater.OPERATION))
+ .set(OrcRecordUpdater.INSERT_OPERATION);
+ ((LongWritable) next.getFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION))
+ .set(0);
+ ((IntWritable) next.getFieldValue(OrcRecordUpdater.BUCKET))
+ .set(bucket);
+ ((LongWritable) next.getFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION))
+ .set(0);
+ ((LongWritable) next.getFieldValue(OrcRecordUpdater.ROW_ID))
+ .set(0);
+ nextRecord.setFieldValue(OrcRecordUpdater.ROW,
+ recordReader.next(OrcRecordUpdater.getRow(next)));
+ }
+ key.setValues(0L, bucket, nextRowId, 0L);
+ if (maxKey != null && key.compareRow(maxKey) > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("key " + key + " > maxkey " + maxKey);
+ }
+ nextRecord = null;
+ recordReader.close();
+ }
+ } else {
+ nextRecord = null;
+ recordReader.close();
+ }
+ }
+
+ @Override
+ int getColumns() {
+ return reader.getTypes().get(0).getSubtypesCount();
+ }
+ }
+
+ private final TreeMap<ReaderKey, ReaderPair> readers =
+ new TreeMap<ReaderKey, ReaderPair>();
+
+ // The reader that currently has the lowest key.
+ private ReaderPair primary;
+
+ // The key of the next lowest reader.
+ private ReaderKey secondaryKey = null;
+
+ /**
+ * Find the key range for original bucket files.
+ * @param reader the reader
+ * @param bucket the bucket number we are reading
+ * @param options the options for reading with
+ * @throws IOException
+ */
+ private void discoverOriginalKeyBounds(Reader reader, int bucket,
+ Reader.Options options
+ ) throws IOException {
+ long rowLength = 0;
+ long rowOffset = 0;
+ long offset = options.getOffset();
+ long maxOffset = options.getMaxOffset();
+ boolean isTail = true;
+ for(StripeInformation stripe: reader.getStripes()) {
+ if (offset > stripe.getOffset()) {
+ rowOffset += stripe.getNumberOfRows();
+ } else if (maxOffset > stripe.getOffset()) {
+ rowLength += stripe.getNumberOfRows();
+ } else {
+ isTail = false;
+ break;
+ }
+ }
+ if (rowOffset > 0) {
+ minKey = new RecordIdentifier(0, bucket, rowOffset - 1);
+ }
+ if (!isTail) {
+ maxKey = new RecordIdentifier(0, bucket, rowOffset + rowLength - 1);
+ }
+ }
+
+ /**
+ * Find the key range for bucket files.
+ * @param reader the reader
+ * @param options the options for reading with
+ * @throws IOException
+ */
+ private void discoverKeyBounds(Reader reader,
+ Reader.Options options) throws IOException {
+ RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader);
+ long offset = options.getOffset();
+ long maxOffset = options.getMaxOffset();
+ int firstStripe = 0;
+ int stripeCount = 0;
+ boolean isTail = true;
+ List<StripeInformation> stripes = reader.getStripes();
+ for(StripeInformation stripe: stripes) {
+ if (offset > stripe.getOffset()) {
+ firstStripe += 1;
+ } else if (maxOffset > stripe.getOffset()) {
+ stripeCount += 1;
+ } else {
+ isTail = false;
+ break;
+ }
+ }
+ if (firstStripe != 0) {
+ minKey = keyIndex[firstStripe - 1];
+ }
+ if (!isTail) {
+ maxKey = keyIndex[firstStripe + stripeCount - 1];
+ }
+ }
+
+ /**
+ * Convert from the row include/sarg/columnNames to the event equivalent
+ * for the underlying file.
+ * @param options options for the row reader
+ * @return a cloned options object that is modified for the event reader
+ */
+ static Reader.Options createEventOptions(Reader.Options options) {
+ Reader.Options result = options.clone();
+ result.range(options.getOffset(), Long.MAX_VALUE);
+ // slide the columns down by 6 for the include array
+ if (options.getInclude() != null) {
+ boolean[] orig = options.getInclude();
+ // we always need the base row
+ orig[0] = true;
+ boolean[] include = new boolean[orig.length + OrcRecordUpdater.FIELDS];
+ Arrays.fill(include, 0, OrcRecordUpdater.FIELDS, true);
+ for(int i= 0; i < orig.length; ++i) {
+ include[i + OrcRecordUpdater.FIELDS] = orig[i];
+ }
+ result.include(include);
+ }
+
+ // slide the column names down by 6 for the name array
+ if (options.getColumnNames() != null) {
+ String[] orig = options.getColumnNames();
+ String[] cols = new String[orig.length + OrcRecordUpdater.FIELDS];
+ for(int i=0; i < orig.length; ++i) {
+ cols[i + OrcRecordUpdater.FIELDS] = orig[i];
+ }
+ result.searchArgument(options.getSearchArgument(), cols);
+ }
+ return result;
+ }
+
+ /**
+ * Create a reader that merge sorts the ACID events together.
+ * @param conf the configuration
+ * @param collapseEvents should the events on the same row be collapsed
+ * @param isOriginal is the base file a pre-acid file
+ * @param bucket the bucket we are reading
+ * @param options the options to read with
+ * @param deltaDirectory the list of delta directories to include
+ * @throws IOException
+ */
+ OrcRawRecordMerger(Configuration conf,
+ boolean collapseEvents,
+ Reader reader,
+ boolean isOriginal,
+ int bucket,
+ ValidTxnList validTxnList,
+ Reader.Options options,
+ Path[] deltaDirectory) throws IOException {
+ this.conf = conf;
+ this.collapse = collapseEvents;
+ this.offset = options.getOffset();
+ this.length = options.getLength();
+ this.validTxnList = validTxnList;
+ // modify the optins to reflect the event instead of the base row
+ Reader.Options eventOptions = createEventOptions(options);
+ if (reader == null) {
+ baseReader = null;
+ } else {
+
+ // find the min/max based on the offset and length
+ if (isOriginal) {
+ discoverOriginalKeyBounds(reader, bucket, options);
+ } else {
+ discoverKeyBounds(reader, options);
+ }
+ LOG.info("min key = " + minKey + ", max key = " + maxKey);
+ // use the min/max instead of the byte range
+ ReaderPair pair;
+ ReaderKey key = new ReaderKey();
+ if (isOriginal) {
+ options = options.clone();
+ options.range(options.getOffset(), Long.MAX_VALUE);
+ pair = new OriginalReaderPair(key, reader, bucket, minKey, maxKey,
+ options);
+ } else {
+ pair = new ReaderPair(key, reader, bucket, minKey, maxKey,
+ eventOptions);
+ }
+
+ // if there is at least one record, put it in the map
+ if (pair.nextRecord != null) {
+ readers.put(key, pair);
+ }
+ baseReader = pair.recordReader;
+ }
+
+ // we always want to read all of the deltas
+ eventOptions.range(0, Long.MAX_VALUE);
+ if (deltaDirectory != null) {
+ for(Path delta: deltaDirectory) {
+ ReaderKey key = new ReaderKey();
+ Path deltaFile = AcidUtils.createBucketFile(delta, bucket);
+ FileSystem fs = deltaFile.getFileSystem(conf);
+ long length = getLastFlushLength(fs, deltaFile);
+ if (fs.exists(deltaFile) && length != -1) {
+ Reader deltaReader = OrcFile.createReader(deltaFile,
+ OrcFile.readerOptions(conf).maxLength(length));
+ ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey,
+ maxKey, eventOptions);
+ if (deltaPair.nextRecord != null) {
+ readers.put(key, deltaPair);
+ }
+ }
+ }
+ }
+
+ // get the first record
+ Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry();
+ if (entry == null) {
+ columns = 0;
+ primary = null;
+ } else {
+ primary = entry.getValue();
+ if (readers.isEmpty()) {
+ secondaryKey = null;
+ } else {
+ secondaryKey = readers.firstKey();
+ }
+ // get the number of columns in the user's rows
+ columns = primary.getColumns();
+ }
+ }
+
+ /**
+ * Read the side file to get the last flush length.
+ * @param fs the file system to use
+ * @param deltaFile the path of the delta file
+ * @return the maximum size of the file to use
+ * @throws IOException
+ */
+ private static long getLastFlushLength(FileSystem fs,
+ Path deltaFile) throws IOException {
+ Path lengths = OrcRecordUpdater.getSideFile(deltaFile);
+ long result = Long.MAX_VALUE;
+ try {
+ FSDataInputStream stream = fs.open(lengths);
+ result = -1;
+ while (stream.available() > 0) {
+ result = stream.readLong();
+ }
+ stream.close();
+ return result;
+ } catch (IOException ioe) {
+ return result;
+ }
+ }
+
+ @VisibleForTesting
+ RecordIdentifier getMinKey() {
+ return minKey;
+ }
+
+ @VisibleForTesting
+ RecordIdentifier getMaxKey() {
+ return maxKey;
+ }
+
+ @VisibleForTesting
+ ReaderPair getCurrentReader() {
+ return primary;
+ }
+
+ @VisibleForTesting
+ Map<ReaderKey, ReaderPair> getOtherReaders() {
+ return readers;
+ }
+
+ @Override
+ public boolean next(RecordIdentifier recordIdentifier,
+ OrcStruct prev) throws IOException {
+ boolean keysSame = true;
+ while (keysSame && primary != null) {
+
+ // The primary's nextRecord is the next value to return
+ OrcStruct current = primary.nextRecord;
+ recordIdentifier.set(primary.key);
+
+ // Advance the primary reader to the next record
+ primary.next(extraValue);
+
+ // Save the current record as the new extraValue for next time so that
+ // we minimize allocations
+ extraValue = current;
+
+ // now that the primary reader has advanced, we need to see if we
+ // continue to read it or move to the secondary.
+ if (primary.nextRecord == null ||
+ primary.key.compareTo(secondaryKey) > 0) {
+
+ // if the primary isn't done, push it back into the readers
+ if (primary.nextRecord != null) {
+ readers.put(primary.key, primary);
+ }
+
+ // update primary and secondaryKey
+ Map.Entry<ReaderKey, ReaderPair> entry = readers.pollFirstEntry();
+ if (entry != null) {
+ primary = entry.getValue();
+ if (readers.isEmpty()) {
+ secondaryKey = null;
+ } else {
+ secondaryKey = readers.firstKey();
+ }
+ } else {
+ primary = null;
+ }
+ }
+
+ // if this transaction isn't ok, skip over it
+ if (!validTxnList.isTxnCommitted(
+ ((ReaderKey) recordIdentifier).getCurrentTransactionId())) {
+ continue;
+ }
+
+ // if we are collapsing, figure out if this is a new row
+ if (collapse) {
+ keysSame = prevKey.compareRow(recordIdentifier) == 0;
+ if (!keysSame) {
+ prevKey.set(recordIdentifier);
+ }
+ } else {
+ keysSame = false;
+ }
+
+ // set the output record by fiddling with the pointers so that we can
+ // avoid a copy.
+ prev.linkFields(current);
+ }
+ return !keysSame;
+ }
+
+ @Override
+ public RecordIdentifier createKey() {
+ return new ReaderKey();
+ }
+
+ @Override
+ public OrcStruct createValue() {
+ return new OrcStruct(OrcRecordUpdater.FIELDS);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return offset + (long)(getProgress() * length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ for(ReaderPair pair: readers.values()) {
+ pair.recordReader.close();
+ }
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return baseReader == null ? 1 : baseReader.getProgress();
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() {
+ // Read the configuration parameters
+ String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
+ // NOTE: if "columns.types" is missing, all columns will be of String type
+ String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+
+ // Parse the configuration parameters
+ ArrayList<String> columnNames = new ArrayList<String>();
+ if (columnNameProperty != null && columnNameProperty.length() > 0) {
+ Collections.addAll(columnNames, columnNameProperty.split(","));
+ }
+ if (columnTypeProperty == null) {
+ // Default type: all string
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < columnNames.size(); i++) {
+ if (i > 0) {
+ sb.append(":");
+ }
+ sb.append("string");
+ }
+ columnTypeProperty = sb.toString();
+ }
+
+ ArrayList<TypeInfo> fieldTypes =
+ TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ StructTypeInfo rowType = new StructTypeInfo();
+ rowType.setAllStructFieldNames(columnNames);
+ rowType.setAllStructFieldTypeInfos(fieldTypes);
+ return OrcRecordUpdater.createEventSchema
+ (OrcStruct.createObjectInspector(rowType));
+ }
+
+ /**
+ * Get the number of columns in the underlying rows.
+ * @return 0 if there are no base and no deltas.
+ */
+ public int getColumns() {
+ return columns;
+ }
+}