You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ga...@apache.org on 2014/09/06 17:38:43 UTC
svn commit: r1622876 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/io/
java/org/apache/hadoop/hive/ql/plan/ test/org/apache/hadoop/hive/ql/exec/
Author: gates
Date: Sat Sep 6 15:38:43 2014
New Revision: 1622876
URL: http://svn.apache.org/r1622876
Log:
HIVE-7078 Need file sink operators that work with ACID (Alan Gates, reveiwed by Prasanth J)
Added:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1622876&r1=1622875&r2=1622876&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sat Sep 6 15:38:43 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -29,12 +30,15 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
@@ -62,6 +66,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
@@ -100,6 +105,10 @@ public class FileSinkOperator extends Te
private transient List<Object> keyWritables;
private transient List<String> keys;
private transient int numKeyColToRead;
+ private StructField recIdField; // field to find record identifier in
+ private StructField bucketField; // field bucket is in in record id
+ private StructObjectInspector recIdInspector; // OI for inspecting record id
+ private IntObjectInspector bucketInspector; // OI for inspecting bucket id
/**
* RecordWriter.
@@ -117,7 +126,10 @@ public class FileSinkOperator extends Te
Path[] outPaths;
Path[] finalPaths;
RecordWriter[] outWriters;
+ RecordUpdater[] updaters;
Stat stat;
+ int acidLastBucket = -1;
+ int acidFileOffset = -1;
public FSPaths() {
}
@@ -128,6 +140,8 @@ public class FileSinkOperator extends Te
outPaths = new Path[numFiles];
finalPaths = new Path[numFiles];
outWriters = new RecordWriter[numFiles];
+ updaters = new RecordUpdater[numFiles];
+ LOG.debug("Created slots for " + numFiles);
stat = new Stat();
}
@@ -168,6 +182,15 @@ public class FileSinkOperator extends Te
}
}
}
+ try {
+ for (int i = 0; i < updaters.length; i++) {
+ if (updaters[i] != null) {
+ updaters[i].close(abort);
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException(e);
+ }
}
private void commit(FileSystem fs) throws HiveException {
@@ -177,7 +200,21 @@ public class FileSinkOperator extends Te
&& !fs.exists(finalPaths[idx].getParent())) {
fs.mkdirs(finalPaths[idx].getParent());
}
- if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+ boolean needToRename = true;
+ if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+ conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ // If we're updating or deleting there may be no file to close. This can happen
+ // because the where clause strained out all of the records for a given bucket. So
+ // before attempting the rename below, check if our file exists. If it doesn't,
+ // then skip the rename. If it does try it. We could just blindly try the rename
+ // and avoid the extra stat, but that would mask other errors.
+ try {
+ FileStatus stat = fs.getFileStatus(outPaths[idx]);
+ } catch (FileNotFoundException fnfe) {
+ needToRename = false;
+ }
+ }
+ if (needToRename && !fs.rename(outPaths[idx], finalPaths[idx])) {
throw new HiveException("Unable to rename output from: " +
outPaths[idx] + " to: " + finalPaths[idx]);
}
@@ -350,6 +387,16 @@ public class FileSinkOperator extends Te
valToPaths.put("", fsp); // special entry for non-DP case
}
}
+
+ if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+ conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ // ROW__ID is always in the first field
+ recIdField = ((StructObjectInspector)outputObjInspector).getAllStructFieldRefs().get(0);
+ recIdInspector = (StructObjectInspector)recIdField.getFieldObjectInspector();
+ // bucket is the second field in the record id
+ bucketField = recIdInspector.getAllStructFieldRefs().get(1);
+ bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
+ }
initializeChildren(hconf);
} catch (HiveException e) {
throw e;
@@ -420,6 +467,7 @@ public class FileSinkOperator extends Te
assert totalFiles == 1;
}
+ int bucketNum = 0;
if (multiFileSpray) {
key.setHashCode(idx);
@@ -436,7 +484,7 @@ public class FileSinkOperator extends Te
}
}
- int bucketNum = prtner.getBucket(key, null, totalFiles);
+ bucketNum = prtner.getBucket(key, null, totalFiles);
if (seenBuckets.contains(bucketNum)) {
continue;
}
@@ -462,7 +510,8 @@ public class FileSinkOperator extends Te
filesCreated = true;
}
- protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException {
+ protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
+ throws HiveException {
try {
if (isNativeTable) {
fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null);
@@ -493,11 +542,21 @@ public class FileSinkOperator extends Te
Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
// only create bucket files only if no dynamic partitions,
// buckets of dynamic partitions will be created for each newly created partition
- fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
- outputClass, conf, fsp.outPaths[filesIdx], reporter);
- // If the record writer provides stats, get it from there instead of the serde
- statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter;
- // increment the CREATED_FILES counter
+ if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+ fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
+ outputClass, conf, fsp.outPaths[filesIdx], reporter);
+ // If the record writer provides stats, get it from there instead of the serde
+ statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof
+ StatsProvidingRecordWriter;
+ // increment the CREATED_FILES counter
+ } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
+ // Only set up the updater for insert. For update and delete we don't know unitl we see
+ // the row.
+ ObjectInspector inspector = bDynParts ? subSetOI : outputObjInspector;
+ int acidBucketNum = Integer.valueOf(Utilities.getTaskIdFromFilename(taskId));
+ fsp.updaters[filesIdx] = HiveFileFormatUtils.getAcidRecordUpdater(jc, conf.getTableInfo(),
+ acidBucketNum, conf, fsp.outPaths[filesIdx], inspector, reporter, -1);
+ }
if (reporter != null) {
reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP),
Operator.HIVECOUNTERCREATEDFILES, 1);
@@ -598,27 +657,47 @@ public class FileSinkOperator extends Te
}
- RecordWriter rowOutWriter = null;
-
if (row_count != null) {
row_count.set(row_count.get() + 1);
}
- if (!multiFileSpray) {
- rowOutWriter = rowOutWriters[0];
+ int writerOffset = findWriterOffset(row);
+ // This if/else chain looks ugly in the inner loop, but given that it will be 100% the same
+ // for a given operator branch prediction should work quite nicely on it.
+ // RecordUpdateer expects to get the actual row, not a serialized version of it. Thus we
+ // pass the row rather than recordValue.
+ if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+ rowOutWriters[writerOffset].write(recordValue);
+ } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
+ fpaths.updaters[writerOffset].insert(conf.getTransactionId(), row);
} else {
- int keyHashCode = 0;
- for (int i = 0; i < partitionEval.length; i++) {
- Object o = partitionEval[i].evaluate(row);
- keyHashCode = keyHashCode * 31
- + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
- }
- key.setHashCode(keyHashCode);
- int bucketNum = prtner.getBucket(key, null, totalFiles);
- int idx = bucketMap.get(bucketNum);
- rowOutWriter = rowOutWriters[idx];
+ // TODO I suspect we could skip much of the stuff above this in the function in the case
+ // of update and delete. But I don't understand all of the side effects of the above
+ // code and don't want to skip over it yet.
+
+ // Find the bucket id, and switch buckets if need to
+ ObjectInspector rowInspector = bDynParts ? subSetOI : outputObjInspector;
+ Object recId = ((StructObjectInspector)rowInspector).getStructFieldData(row, recIdField);
+ int bucketNum =
+ bucketInspector.get(recIdInspector.getStructFieldData(recId, bucketField));
+ if (fpaths.acidLastBucket != bucketNum) {
+ fpaths.acidLastBucket = bucketNum;
+ // Switch files
+ fpaths.updaters[++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
+ jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[fpaths.acidFileOffset],
+ rowInspector, reporter, 0);
+ LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
+ fpaths.outPaths[fpaths.acidFileOffset]);
+ }
+
+ if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
+ fpaths.updaters[fpaths.acidFileOffset].update(conf.getTransactionId(), row);
+ } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ fpaths.updaters[fpaths.acidFileOffset].delete(conf.getTransactionId(), row);
+ } else {
+ throw new HiveException("Unknown write type " + conf.getWriteType().toString());
+ }
}
- rowOutWriter.write(recordValue);
} catch (IOException e) {
throw new HiveException(e);
} catch (SerDeException e) {
@@ -627,6 +706,11 @@ public class FileSinkOperator extends Te
}
protected boolean areAllTrue(boolean[] statsFromRW) {
+ // If we are doing an acid operation they will always all be true as RecordUpdaters always
+ // collect stats
+ if (conf.getWriteType() != AcidUtils.Operation.NOT_ACID) {
+ return true;
+ }
for(boolean b : statsFromRW) {
if (!b) {
return false;
@@ -635,6 +719,23 @@ public class FileSinkOperator extends Te
return true;
}
+ private int findWriterOffset(Object row) throws HiveException {
+ if (!multiFileSpray) {
+ return 0;
+ } else {
+ int keyHashCode = 0;
+ for (int i = 0; i < partitionEval.length; i++) {
+ Object o = partitionEval[i].evaluate(row);
+ keyHashCode = keyHashCode * 31
+ + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+ }
+ key.setHashCode(keyHashCode);
+ int bucketNum = prtner.getBucket(key, null, totalFiles);
+ return bucketMap.get(bucketNum);
+ }
+
+ }
+
/**
* Lookup list bucketing path.
* @param lbDirName
@@ -727,14 +828,16 @@ public class FileSinkOperator extends Te
FSPaths fp;
// get the path corresponding to the dynamic partition columns,
- String dpDir = getDynPartDirectory(row, dpColNames, numDynParts);
+ String dpDir = getDynPartDirectory(row, dpColNames);
String pathKey = null;
if (dpDir != null) {
dpDir = appendToSource(lbDirName, dpDir);
pathKey = dpDir;
+ int numericBucketNum = 0;
if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
String buckNum = row.get(row.size() - 1);
+ numericBucketNum = Integer.valueOf(buckNum);
taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), buckNum);
pathKey = appendToSource(taskId, dpDir);
}
@@ -756,13 +859,18 @@ public class FileSinkOperator extends Te
// since we are closing the previous fsp's record writers, we need to see if we can get
// stats from the record writer and store in the previous fsp that is cached
if (conf.isGatherStats() && isCollectRWStats) {
- RecordWriter outWriter = prevFsp.outWriters[0];
- if (outWriter != null) {
- SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
- if (stats != null) {
+ SerDeStats stats = null;
+ if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+ RecordWriter outWriter = prevFsp.outWriters[0];
+ if (outWriter != null) {
+ stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+ }
+ } else if (prevFsp.updaters[0] != null) {
+ stats = prevFsp.updaters[0].getStats();
+ }
+ if (stats != null) {
prevFsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
prevFsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
- }
}
}
@@ -805,8 +913,7 @@ public class FileSinkOperator extends Te
// given the current input row, the mapping for input col info to dp columns, and # of dp cols,
// return the relative path corresponding to the row.
// e.g., ds=2008-04-08/hr=11
- private String getDynPartDirectory(List<String> row, List<String> dpColNames, int numDynParts) {
- assert row.size() == numDynParts && numDynParts == dpColNames.size() : "data length is different from num of DP columns";
+ private String getDynPartDirectory(List<String> row, List<String> dpColNames) {
return FileUtils.makePartName(dpColNames, row);
}
@@ -832,6 +939,7 @@ public class FileSinkOperator extends Te
@Override
public void closeOp(boolean abort) throws HiveException {
+
if (!bDynParts && !filesCreated) {
createBucketFiles(fsp);
}
@@ -849,13 +957,25 @@ public class FileSinkOperator extends Te
// record writer already gathers the statistics, it can simply return the
// accumulated statistics which will be aggregated in case of spray writers
if (conf.isGatherStats() && isCollectRWStats) {
- for (int idx = 0; idx < fsp.outWriters.length; idx++) {
- RecordWriter outWriter = fsp.outWriters[idx];
- if (outWriter != null) {
- SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
- if (stats != null) {
- fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
- fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+ if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+ for (int idx = 0; idx < fsp.outWriters.length; idx++) {
+ RecordWriter outWriter = fsp.outWriters[idx];
+ if (outWriter != null) {
+ SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+ if (stats != null) {
+ fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+ fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+ }
+ }
+ }
+ } else {
+ for (int i = 0; i < fsp.updaters.length; i++) {
+ if (fsp.updaters[i] != null) {
+ SerDeStats stats = fsp.updaters[i].getStats();
+ if (stats != null) {
+ fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+ fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+ }
}
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1622876&r1=1622875&r2=1622876&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Sat Sep 6 15:38:43 2014
@@ -164,6 +164,8 @@ public class AcidUtils {
return result;
}
+ public enum Operation { NOT_ACID, INSERT, UPDATE, DELETE }
+
public static interface Directory {
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1622876&r1=1622875&r2=1622876&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Sat Sep 6 15:38:43 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -249,21 +250,8 @@ public final class HiveFileFormatUtils {
public static RecordWriter getHiveRecordWriter(JobConf jc,
TableDesc tableInfo, Class<? extends Writable> outputClass,
FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
- boolean storagehandlerofhivepassthru = false;
- HiveOutputFormat<?, ?> hiveOutputFormat;
+ HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
try {
- if (tableInfo.getJobProperties() != null) {
- if (tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
- jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,tableInfo.getJobProperties().get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY));
- storagehandlerofhivepassthru = true;
- }
- }
- if (storagehandlerofhivepassthru) {
- hiveOutputFormat = ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(),jc);
- }
- else {
- hiveOutputFormat = tableInfo.getOutputFileFormatClass().newInstance();
- }
boolean isCompressed = conf.getCompressed();
JobConf jc_output = jc;
if (isCompressed) {
@@ -299,6 +287,73 @@ public final class HiveFileFormatUtils {
return null;
}
+ private static HiveOutputFormat<?, ?> getHiveOutputFormat(JobConf jc, TableDesc tableInfo)
+ throws HiveException {
+ boolean storagehandlerofhivepassthru = false;
+ HiveOutputFormat<?, ?> hiveOutputFormat;
+ try {
+ if (tableInfo.getJobProperties() != null) {
+ if (tableInfo.getJobProperties().get(
+ HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY) != null) {
+ jc.set(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY,
+ tableInfo.getJobProperties()
+ .get(HivePassThroughOutputFormat.HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY));
+ storagehandlerofhivepassthru = true;
+ }
+ }
+ if (storagehandlerofhivepassthru) {
+ return ReflectionUtils.newInstance(tableInfo.getOutputFileFormatClass(), jc);
+ } else {
+ return tableInfo.getOutputFileFormatClass().newInstance();
+ }
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
+
+ public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket,
+ FileSinkDesc conf, Path outPath,
+ ObjectInspector inspector,
+ Reporter reporter, int rowIdColNum)
+ throws HiveException, IOException {
+ HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo);
+ AcidOutputFormat<?, ?> acidOutputFormat = null;
+ if (hiveOutputFormat instanceof AcidOutputFormat) {
+ acidOutputFormat = (AcidOutputFormat)hiveOutputFormat;
+ } else {
+ throw new HiveException("Unable to create RecordUpdater for HiveOutputFormat that does not " +
+ "implement AcidOutputFormat");
+ }
+ // TODO not 100% sure about this. This call doesn't set the compression type in the conf
+ // file the way getHiveRecordWriter does, as ORC appears to read the value for itself. Not
+ // sure if this is correct or not.
+ return getRecordUpdater(jc, acidOutputFormat, conf.getCompressed(), conf.getTransactionId(),
+ bucket, inspector, tableInfo.getProperties(), outPath, reporter, rowIdColNum);
+ }
+
+
+ private static RecordUpdater getRecordUpdater(JobConf jc,
+ AcidOutputFormat<?, ?> acidOutputFormat,
+ boolean isCompressed,
+ long txnId,
+ int bucket,
+ ObjectInspector inspector,
+ Properties tableProp,
+ Path outPath,
+ Reporter reporter,
+ int rowIdColNum) throws IOException {
+ return acidOutputFormat.getRecordUpdater(outPath, new AcidOutputFormat.Options(jc)
+ .isCompressed(isCompressed)
+ .tableProperties(tableProp)
+ .reporter(reporter)
+ .writingBase(false)
+ .minimumTransactionId(txnId)
+ .maximumTransactionId(txnId)
+ .bucket(bucket)
+ .inspector(inspector)
+ .recordIdColumn(rowIdColNum));
+ }
+
public static PartitionDesc getPartitionDescFromPathRecursively(
Map<String, PartitionDesc> pathToPartitionInfo, Path dir,
Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cacheMap)
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=1622876&r1=1622875&r2=1622876&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Sat Sep 6 15:38:43 2014
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
/**
* FileSinkDesc.
@@ -84,6 +85,10 @@ public class FileSinkDesc extends Abstra
private boolean statsCollectRawDataSize;
+ // Record what type of write this is. Default is non-ACID (ie old style).
+ private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
+ private long txnId = 0; // transaction id for this operation
+
public FileSinkDesc() {
}
@@ -137,6 +142,8 @@ public class FileSinkDesc extends Abstra
ret.setMaxStatsKeyPrefixLength(maxStatsKeyPrefixLength);
ret.setStatsCollectRawDataSize(statsCollectRawDataSize);
ret.setDpSortState(dpSortState);
+ ret.setWriteType(writeType);
+ ret.setTransactionId(txnId);
return (Object) ret;
}
@@ -398,4 +405,20 @@ public class FileSinkDesc extends Abstra
public void setDpSortState(DPSortState dpSortState) {
this.dpSortState = dpSortState;
}
+
+ public void setWriteType(AcidUtils.Operation type) {
+ writeType = type;
+ }
+
+ public AcidUtils.Operation getWriteType() {
+ return writeType;
+ }
+
+ public void setTransactionId(long id) {
+ txnId = id;
+ }
+
+ public long getTransactionId() {
+ return txnId;
+ }
}
Added: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java?rev=1622876&view=auto
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java (added)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java Sat Sep 6 15:38:43 2014
@@ -0,0 +1,757 @@
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.stats.StatsAggregator;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Tests for {@link org.apache.hadoop.hive.ql.exec.FileSinkOperator}
+ */
+public class TestFileSinkOperator {
+ private static String PARTCOL_NAME = "partval";
+ static final private Log LOG = LogFactory.getLog(TestFileSinkOperator.class.getName());
+
+ private static File tmpdir;
+ private static TableDesc nonAcidTableDescriptor;
+ private static TableDesc acidTableDescriptor;
+ private static ObjectInspector inspector;
+ private static List<TFSORow> rows;
+ private static ValidTxnList txnList;
+
+ private Path basePath;
+ private JobConf jc;
+
+ @BeforeClass
+ public static void classSetup() {
+ Properties properties = new Properties();
+ properties.setProperty(serdeConstants.SERIALIZATION_LIB, TFSOSerDe.class.getName());
+ nonAcidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties);
+ properties = new Properties(properties);
+ properties.setProperty(hive_metastoreConstants.BUCKET_COUNT, "1");
+ acidTableDescriptor = new TableDesc(TFSOInputFormat.class, TFSOOutputFormat.class, properties);
+
+ tmpdir = new File(System.getProperty("java.io.tmpdir") + System.getProperty("file.separator") +
+ "testFileSinkOperator");
+ tmpdir.mkdir();
+ tmpdir.deleteOnExit();
+ txnList = new ValidTxnListImpl(new long[]{}, 2);
+ }
+
+ @Test
+ public void testNonAcidWrite() throws Exception {
+ setBasePath("write");
+ setupData(DataFormat.SIMPLE);
+ FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, false, 0);
+ processRows(op);
+ confirmOutput();
+ }
+
+ @Test
+ public void testInsert() throws Exception {
+ setBasePath("insert");
+ setupData(DataFormat.SIMPLE);
+ FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, false, 1);
+ processRows(op);
+ Assert.assertEquals("10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+ confirmOutput();
+ }
+
+ @Test
+ public void testUpdate() throws Exception {
+ setBasePath("update");
+ setupData(DataFormat.WITH_RECORD_ID);
+ FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, false, 2);
+ processRows(op);
+ Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+ confirmOutput();
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ setBasePath("delete");
+ setupData(DataFormat.WITH_RECORD_ID);
+ FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, false, 2);
+ processRows(op);
+ Assert.assertEquals("-10", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+ confirmOutput();
+ }
+
+ @Test
+ public void testNonAcidDynamicPartitioning() throws Exception {
+ setBasePath("writeDP");
+ setupData(DataFormat.WITH_PARTITION_VALUE);
+ FileSinkOperator op = getFileSink(AcidUtils.Operation.NOT_ACID, true, 0);
+ processRows(op);
+ confirmOutput();
+ }
+
+
+ @Test
+ public void testInsertDynamicPartitioning() throws Exception {
+ setBasePath("insertDP");
+ setupData(DataFormat.WITH_PARTITION_VALUE);
+ FileSinkOperator op = getFileSink(AcidUtils.Operation.INSERT, true, 1);
+ processRows(op);
+ // We only expect 5 here because we'll get whichever of the partitions published its stats
+ // last.
+ Assert.assertEquals("5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+ confirmOutput();
+ }
+
+ @Test
+ public void testUpdateDynamicPartitioning() throws Exception {
+ setBasePath("updateDP");
+ setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE);
+ FileSinkOperator op = getFileSink(AcidUtils.Operation.UPDATE, true, 2);
+ processRows(op);
+ Assert.assertEquals("0", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+ confirmOutput();
+ }
+
+ @Test
+ public void testDeleteDynamicPartitioning() throws Exception {
+ setBasePath("deleteDP");
+ setupData(DataFormat.WITH_RECORD_ID_AND_PARTITION_VALUE);
+ FileSinkOperator op = getFileSink(AcidUtils.Operation.DELETE, true, 2);
+ processRows(op);
+ // We only expect -5 here because we'll get whichever of the partitions published its stats
+ // last.
+ Assert.assertEquals("-5", TFSOStatsPublisher.stats.get(StatsSetupConst.ROW_COUNT));
+ confirmOutput();
+ }
+
+
+ @Before
+ public void setup() throws Exception {
+ jc = new JobConf();
+ jc.set(StatsSetupConst.STATS_TMP_LOC, File.createTempFile("TestFileSinkOperator",
+ "stats").getPath());
+ jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_PUBLISHER.varname,
+ TFSOStatsPublisher.class.getName());
+ jc.set(HiveConf.ConfVars.HIVE_STATS_DEFAULT_AGGREGATOR.varname,
+ TFSOStatsAggregator.class.getName());
+ jc.set(HiveConf.ConfVars.HIVESTATSDBCLASS.varname, "custom");
+ }
+
+ private void setBasePath(String testName) {
+ basePath = new Path(new File(tmpdir, testName).getPath());
+
+ }
+
+ private enum DataFormat {SIMPLE, WITH_RECORD_ID, WITH_PARTITION_VALUE,
+ WITH_RECORD_ID_AND_PARTITION_VALUE};
+
+ private void setupData(DataFormat format) {
+
+ // Build object inspector
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (TFSORow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ rows = new ArrayList<TFSORow>();
+
+ switch (format) {
+ case SIMPLE:
+ // Build rows
+ for (int i = 0; i < 10; i++) {
+ rows.add(
+ new TFSORow(
+ new Text("mary had a little lamb")
+ )
+ );
+ }
+ break;
+
+ case WITH_RECORD_ID:
+ for (int i = 0; i < 10; i++) {
+ rows.add(
+ new TFSORow(
+ new Text("its fleect was white as snow"),
+ new RecordIdentifier(1, 1, i)
+ )
+ );
+ }
+ break;
+
+ case WITH_PARTITION_VALUE:
+ for (int i = 0; i < 10; i++) {
+ rows.add(
+ new TFSORow(
+ new Text("its fleect was white as snow"),
+ (i < 5) ? new Text("Monday") : new Text("Tuesday")
+ )
+ );
+ }
+ break;
+
+ case WITH_RECORD_ID_AND_PARTITION_VALUE:
+ for (int i = 0; i < 10; i++) {
+ rows.add(
+ new TFSORow(
+ new Text("its fleect was white as snow"),
+ (i < 5) ? new Text("Monday") : new Text("Tuesday"),
+ new RecordIdentifier(1, 1, i)
+ )
+ );
+ }
+ break;
+
+ default:
+ throw new RuntimeException("Unknown option!");
+ }
+ }
+
+ private FileSinkOperator getFileSink(AcidUtils.Operation writeType,
+ boolean dynamic,
+ long txnId) throws IOException, HiveException {
+ TableDesc tableDesc = null;
+ switch (writeType) {
+ case DELETE:
+ case UPDATE:
+ case INSERT:
+ tableDesc = acidTableDescriptor;
+ break;
+
+ case NOT_ACID:
+ tableDesc = nonAcidTableDescriptor;
+ break;
+ }
+ FileSinkDesc desc = null;
+ if (dynamic) {
+ ArrayList<ExprNodeDesc> partCols = new ArrayList<ExprNodeDesc>(1);
+ partCols.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, PARTCOL_NAME, "a", true));
+ Map<String, String> partColMap= new LinkedHashMap<String, String>(1);
+ partColMap.put(PARTCOL_NAME, null);
+ DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100);
+ Map<String, String> partColNames = new HashMap<String, String>(1);
+ partColNames.put(PARTCOL_NAME, PARTCOL_NAME);
+ dpCtx.setInputToDPCols(partColNames);
+ desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx);
+ } else {
+ desc = new FileSinkDesc(basePath, tableDesc, false);
+ }
+ desc.setWriteType(writeType);
+ desc.setGatherStats(true);
+ if (txnId > 0) desc.setTransactionId(txnId);
+ if (writeType != AcidUtils.Operation.NOT_ACID) desc.setTransactionId(1L);
+
+ FileSinkOperator op = (FileSinkOperator)OperatorFactory.get(FileSinkDesc.class);
+ op.setConf(desc);
+ op.initialize(jc, new ObjectInspector[]{inspector});
+ return op;
+ }
+
+ private void processRows(FileSinkOperator op) throws HiveException {
+ for (TFSORow r : rows) op.processOp(r, 0);
+ op.jobCloseOp(jc, true);
+ op.close(false);
+ }
+
+ private void confirmOutput() throws IOException, SerDeException {
+ Path[] paths = findFilesInBasePath();
+ TFSOInputFormat input = new TFSOInputFormat();
+ FileInputFormat.setInputPaths(jc, paths);
+
+ InputSplit[] splits = input.getSplits(jc, 1);
+ RecordReader<NullWritable, TFSORow> reader = input.getRecordReader(splits[0], jc,
+ Mockito.mock(Reporter.class));
+ NullWritable key = reader.createKey();
+ TFSORow value = reader.createValue();
+ List<TFSORow> results = new ArrayList<TFSORow>(rows.size());
+ List<TFSORow> sortedRows = new ArrayList<TFSORow>(rows.size());
+ for (int i = 0; i < rows.size(); i++) {
+ Assert.assertTrue(reader.next(key, value));
+ results.add(new TFSORow(value));
+ sortedRows.add(new TFSORow(rows.get(i)));
+ }
+ Assert.assertFalse(reader.next(key, value));
+ Collections.sort(results);
+ Collections.sort(sortedRows);
+ for (int i = 0; i < rows.size(); i++) {
+ Assert.assertTrue(sortedRows.get(i).equals(results.get(i)));
+ }
+
+ }
+
+ private Path[] findFilesInBasePath() throws IOException {
+ Path parent = basePath.getParent();
+ String last = basePath.getName();
+ Path tmpPath = new Path(parent, "_tmp." + last);
+ FileSystem fs = basePath.getFileSystem(jc);
+ List<Path> paths = new ArrayList<Path>();
+ recurseOnPath(tmpPath, fs, paths);
+ return paths.toArray(new Path[paths.size()]);
+ }
+
+ private void recurseOnPath(Path p, FileSystem fs, List<Path> paths) throws IOException {
+ if (fs.getFileStatus(p).isDir()) {
+ FileStatus[] stats = fs.listStatus(p);
+ for (FileStatus stat : stats) recurseOnPath(stat.getPath(), fs, paths);
+ } else {
+ paths.add(p);
+ }
+ }
+
+ private static class TFSORow implements WritableComparable<TFSORow> {
+ private RecordIdentifier recId;
+ private Text data;
+ private Text partVal;
+
+ TFSORow() {
+ this(null, null, null);
+ }
+
+ TFSORow(Text t) {
+ this(t, null, null);
+ }
+
+ TFSORow(Text t, Text pv) {
+ this(t, pv, null);
+ }
+
+ TFSORow(Text t, RecordIdentifier ri) {
+ this(t, null, ri);
+ }
+
+ TFSORow(Text t, Text pv, RecordIdentifier ri) {
+ data = t;
+ partVal = pv;
+ recId = ri;
+
+ }
+
+ TFSORow(TFSORow other) {
+ this(other.data, other.partVal, other.recId);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ data.write(dataOutput);
+ if (partVal == null) {
+ dataOutput.writeBoolean(false);
+ } else {
+ dataOutput.writeBoolean(true);
+ partVal.write(dataOutput);
+ }
+ if (recId == null) {
+ dataOutput.writeBoolean(false);
+ } else {
+ dataOutput.writeBoolean(true);
+ recId.write(dataOutput);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ data = new Text();
+ data.readFields(dataInput);
+ boolean notNull = dataInput.readBoolean();
+ if (notNull) {
+ partVal = new Text();
+ partVal.readFields(dataInput);
+ }
+ notNull = dataInput.readBoolean();
+ if (notNull) {
+ recId = new RecordIdentifier();
+ recId.readFields(dataInput);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TFSORow) {
+ TFSORow other = (TFSORow) obj;
+ if (data == null && other.data == null) return checkPartVal(other);
+ else if (data == null) return false;
+ else if (data.equals(other.data)) return checkPartVal(other);
+ else return false;
+ } else {
+ return false;
+ }
+ }
+
+ private boolean checkPartVal(TFSORow other) {
+ if (partVal == null && other.partVal == null) return checkRecId(other);
+ else if (partVal == null) return false;
+ else if (partVal.equals(other.partVal)) return checkRecId(other);
+ else return false;
+ }
+
+ private boolean checkRecId(TFSORow other) {
+ if (recId == null && other.recId == null) return true;
+ else if (recId == null) return false;
+ else return recId.equals(other.recId);
+ }
+
+ @Override
+ public int compareTo(TFSORow other) {
+ if (recId == null && other.recId == null) {
+ return comparePartVal(other);
+ } else if (recId == null) {
+ return -1;
+ } else {
+ int rc = recId.compareTo(other.recId);
+ if (rc == 0) return comparePartVal(other);
+ else return rc;
+ }
+ }
+
+ private int comparePartVal(TFSORow other) {
+ if (partVal == null && other.partVal == null) {
+ return compareData(other);
+ } else if (partVal == null) {
+ return -1;
+ } else {
+ int rc = partVal.compareTo(other.partVal);
+ if (rc == 0) return compareData(other);
+ else return rc;
+ }
+ }
+
+ private int compareData(TFSORow other) {
+ if (data == null && other.data == null) return 0;
+ else if (data == null) return -1;
+ else return data.compareTo(other.data);
+ }
+ }
+
+ private static class TFSOInputFormat extends FileInputFormat<NullWritable, TFSORow>
+ implements AcidInputFormat<NullWritable, TFSORow> {
+
+ FSDataInputStream in[] = null;
+ int readingFrom = -1;
+
+ @Override
+ public RecordReader<NullWritable, TFSORow> getRecordReader(
+ InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
+ if (in == null) {
+ Path paths[] = FileInputFormat.getInputPaths(entries);
+ in = new FSDataInputStream[paths.length];
+ FileSystem fs = paths[0].getFileSystem(entries);
+ for (int i = 0; i < paths.length; i++) {
+ in[i] = fs.open(paths[i]);
+ }
+ readingFrom = 0;
+ }
+ return new RecordReader<NullWritable, TFSORow>() {
+
+ @Override
+ public boolean next(NullWritable nullWritable, TFSORow tfsoRecord) throws
+ IOException {
+ try {
+ tfsoRecord.readFields(in[readingFrom]);
+ return true;
+ } catch (EOFException e) {
+ in[readingFrom].close();
+ if (++readingFrom >= in.length) return false;
+ else return next(nullWritable, tfsoRecord);
+ }
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public TFSORow createValue() {
+ return new TFSORow();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return 0L;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return 0.0f;
+ }
+ };
+ }
+
+ @Override
+ public RowReader<TFSORow> getReader(InputSplit split,
+ Options options) throws
+ IOException {
+ return null;
+ }
+
+ @Override
+ public RawReader<TFSORow> getRawReader(Configuration conf,
+ boolean collapseEvents,
+ int bucket,
+ ValidTxnList validTxnList,
+ Path baseDirectory,
+ Path[] deltaDirectory) throws
+ IOException {
+ return null;
+ }
+
+ @Override
+ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws
+ IOException {
+ return false;
+ }
+ }
+
+ public static class TFSOOutputFormat extends FileOutputFormat<NullWritable, TFSORow>
+ implements AcidOutputFormat<NullWritable, TFSORow> {
+ List<TFSORow> records = new ArrayList<TFSORow>();
+ long numRecordsAdded = 0;
+ FSDataOutputStream out = null;
+
+ @Override
+ public RecordUpdater getRecordUpdater(final Path path, final Options options) throws
+ IOException {
+
+ final StructObjectInspector inspector = (StructObjectInspector)options.getInspector();
+ return new RecordUpdater() {
+ @Override
+ public void insert(long currentTransaction, Object row) throws IOException {
+ addRow(row);
+ numRecordsAdded++;
+ }
+
+ @Override
+ public void update(long currentTransaction, Object row) throws IOException {
+ addRow(row);
+ }
+
+ @Override
+ public void delete(long currentTransaction, Object row) throws IOException {
+ addRow(row);
+ numRecordsAdded--;
+ }
+
+ private void addRow(Object row) {
+ assert row instanceof TFSORow : "Expected TFSORow but got " +
+ row.getClass().getName();
+ records.add((TFSORow)row);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (out == null) {
+ FileSystem fs = path.getFileSystem(options.getConfiguration());
+ out = fs.create(path);
+ }
+ for (TFSORow r : records) r.write(out);
+ records.clear();
+ out.flush();
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ flush();
+ out.close();
+ }
+
+ @Override
+ public SerDeStats getStats() {
+ SerDeStats stats = new SerDeStats();
+ stats.setRowCount(numRecordsAdded);
+ return stats;
+ }
+ };
+ }
+
+ @Override
+ public FileSinkOperator.RecordWriter getRawRecordWriter(Path path,
+ Options options) throws
+ IOException {
+ return null;
+ }
+
+ @Override
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(final JobConf jc,
+ final Path finalOutPath,
+ Class<? extends Writable> valueClass,
+ boolean isCompressed,
+ Properties tableProperties,
+ Progressable progress)
+ throws IOException {
+ return new FileSinkOperator.RecordWriter() {
+ @Override
+ public void write(Writable w) throws IOException {
+ Assert.assertTrue(w instanceof TFSORow);
+ records.add((TFSORow) w);
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ if (out == null) {
+ FileSystem fs = finalOutPath.getFileSystem(jc);
+ out = fs.create(finalOutPath);
+ }
+ for (TFSORow r : records) r.write(out);
+ records.clear();
+ out.flush();
+ out.close();
+ }
+ };
+ }
+
+ @Override
+ public RecordWriter<NullWritable, TFSORow> getRecordWriter(
+ FileSystem fileSystem, JobConf entries, String s, Progressable progressable) throws
+ IOException {
+ return null;
+ }
+
+ @Override
+ public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOException {
+
+ }
+ }
+
+ public static class TFSOSerDe implements SerDe {
+
+ @Override
+ public void initialize(Configuration conf, Properties tbl) throws SerDeException {
+
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return TFSORow.class;
+ }
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+ assert obj instanceof TFSORow : "Expected TFSORow or decendent, got "
+ + obj.getClass().getName();
+ return (TFSORow)obj;
+ }
+
+ @Override
+ public Object deserialize(Writable blob) throws SerDeException {
+ assert blob instanceof TFSORow : "Expected TFSORow or decendent, got "
+ + blob.getClass().getName();
+ return blob;
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return null;
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ return null;
+ }
+ }
+
+ public static class TFSOStatsPublisher implements StatsPublisher {
+ static Map<String, String> stats;
+
+ @Override
+ public boolean init(Configuration hconf) {
+ return true;
+ }
+
+ @Override
+ public boolean connect(Configuration hconf) {
+ return true;
+ }
+
+ @Override
+ public boolean publishStat(String fileID, Map<String, String> stats) {
+ this.stats = stats;
+ return true;
+ }
+
+ @Override
+ public boolean closeConnection() {
+ return true;
+ }
+ }
+
+ public static class TFSOStatsAggregator implements StatsAggregator {
+
+ @Override
+ public boolean connect(Configuration hconf, Task sourceTask) {
+ return true;
+ }
+
+ @Override
+ public String aggregateStats(String keyPrefix, String statType) {
+ return null;
+ }
+
+ @Override
+ public boolean closeConnection() {
+ return true;
+ }
+
+ @Override
+ public boolean cleanUp(String keyPrefix) {
+ return true;
+ }
+ }
+}