You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/09/16 19:37:19 UTC
svn commit: r1625341 [3/12] - in /hive/branches/llap: ./
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/
contrib/src/test/results/clientpositive/ data/conf/tez/ data/files/
hbase-handler/ itests/hive-unit-had...
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/ReadEntity.java Tue Sep 16 17:37:13 2014
@@ -48,6 +48,11 @@ public class ReadEntity extends Entity i
// is marked as being read. Defaults to true as that is the most common case.
private boolean needsLock = true;
+ // When true indicates that this object is being read as part of an update or delete. This is
+ // important because in that case we shouldn't acquire a lock for it or authorize the read.
+ // These will be handled by the output to the table instead.
+ private boolean isUpdateOrDelete = false;
+
// For views, the entities can be nested - by default, entities are at the top level
private final Set<ReadEntity> parents = new HashSet<ReadEntity>();
@@ -166,4 +171,12 @@ public class ReadEntity extends Entity i
public List<String> getAccessedColumns() {
return accessedColumns;
}
+
+ public void setUpdateOrDelete(boolean isUpdateOrDelete) {
+ this.isUpdateOrDelete = isUpdateOrDelete;
+ }
+
+ public boolean isUpdateOrDelete() {
+ return isUpdateOrDelete;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java Tue Sep 16 17:37:13 2014
@@ -148,6 +148,16 @@ public class WriteEntity extends Entity
}
/**
+ * Only use this if you are very sure of what you are doing. This is used by the
+ * {@link org.apache.hadoop.hive.ql.parse.UpdateDeleteSemanticAnalyzer} to reset the types to
+ * update or delete after rewriting and reparsing the queries.
+ * @param type new operation type
+ */
+ public void setWriteType(WriteType type) {
+ writeType = type;
+ }
+
+ /**
* Equals function.
*/
@Override
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Tue Sep 16 17:37:13 2014
@@ -26,14 +26,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
@@ -42,24 +40,40 @@ import java.util.regex.Pattern;
* are used by the compactor and cleaner and thus must be format agnostic.
*/
public class AcidUtils {
- private AcidUtils() {
- // NOT USED
- }
- private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
-
public static final String BASE_PREFIX = "base_";
public static final String DELTA_PREFIX = "delta_";
+ public static final PathFilter deltaFileFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(DELTA_PREFIX);
+ }
+ };
public static final String BUCKET_PREFIX = "bucket_";
-
+ public static final PathFilter bucketFileFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(BUCKET_PREFIX);
+ }
+ };
public static final String BUCKET_DIGITS = "%05d";
public static final String DELTA_DIGITS = "%07d";
+ public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
+ public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
+ public static final PathFilter originalBucketFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return ORIGINAL_PATTERN.matcher(path.getName()).matches();
+ }
+ };
+
+ private AcidUtils() {
+ // NOT USED
+ }
+ private static final Log LOG = LogFactory.getLog(AcidUtils.class.getName());
private static final Pattern ORIGINAL_PATTERN =
Pattern.compile("[0-9]+_[0-9]+");
- public static final Pattern BUCKET_DIGIT_PATTERN = Pattern.compile("[0-9]{5}$");
- public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{5}");
-
public static final PathFilter hiddenFileFilter = new PathFilter(){
public boolean accept(Path p){
String name = p.getName();
@@ -67,13 +81,6 @@ public class AcidUtils {
}
};
- public static final PathFilter bucketFileFilter = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().startsWith(BUCKET_PREFIX);
- }
- };
-
private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
/**
@@ -149,7 +156,7 @@ public class AcidUtils {
.minimumTransactionId(0)
.maximumTransactionId(0)
.bucket(bucket);
- } else if (filename.startsWith(AcidUtils.BUCKET_PREFIX)) {
+ } else if (filename.startsWith(BUCKET_PREFIX)) {
int bucket =
Integer.parseInt(filename.substring(filename.indexOf('_') + 1));
result
@@ -372,7 +379,8 @@ public class AcidUtils {
}
final Path base = bestBase == null ? null : bestBase.getPath();
- LOG.debug("base = " + base + " deltas = " + deltas.size());
+ LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
+ deltas.size());
return new Directory(){
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CompressionCodec.java Tue Sep 16 17:37:13 2014
@@ -19,8 +19,20 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
interface CompressionCodec {
+
+ public enum Modifier {
+ /* speed/compression tradeoffs */
+ FASTEST,
+ FAST,
+ DEFAULT,
+ /* data sensitivity modifiers */
+ TEXT,
+ BINARY
+ };
+
/**
* Compress the in buffer to the out buffer.
* @param in the bytes to compress
@@ -39,4 +51,17 @@ interface CompressionCodec {
* @throws IOException
*/
void decompress(ByteBuffer in, ByteBuffer out) throws IOException;
+
+ /**
+ * Produce a modified compression codec if the underlying algorithm allows
+ * modification.
+ *
+ * This does not modify the current object, but returns a new object if
+ * modifications are possible. Returns the same object if no modifications
+ * are possible.
+ * @param modifiers compression modifiers
+ * @return codec for use after optional modification
+ */
+ CompressionCodec modify(EnumSet<Modifier> modifiers);
+
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Tue Sep 16 17:37:13 2014
@@ -101,6 +101,10 @@ public final class OrcFile {
SPEED, COMPRESSION;
}
+ public static enum CompressionStrategy {
+ SPEED, COMPRESSION;
+ }
+
// Note : these string definitions for table properties are deprecated,
// and retained only for backward compatibility, please do not add to
// them, add to OrcTableProperties below instead
@@ -230,6 +234,7 @@ public final class OrcFile {
private Version versionValue;
private WriterCallback callback;
private EncodingStrategy encodingStrategy;
+ private CompressionStrategy compressionStrategy;
private float paddingTolerance;
WriterOptions(Configuration conf) {
@@ -254,6 +259,15 @@ public final class OrcFile {
} else {
encodingStrategy = EncodingStrategy.valueOf(enString);
}
+
+ String compString = conf
+ .get(HiveConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname);
+ if (compString == null) {
+ compressionStrategy = CompressionStrategy.SPEED;
+ } else {
+ compressionStrategy = CompressionStrategy.valueOf(compString);
+ }
+
paddingTolerance =
conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname,
HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal);
@@ -403,7 +417,8 @@ public final class OrcFile {
opts.bufferSizeValue, opts.rowIndexStrideValue,
opts.memoryManagerValue, opts.blockPaddingValue,
opts.versionValue, opts.callback,
- opts.encodingStrategy, opts.paddingTolerance,
+ opts.encodingStrategy, opts.compressionStrategy,
+ opts.paddingTolerance,
opts.blockSizeValue);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFileStripeMergeInputFormat.java Tue Sep 16 17:37:13 2014
@@ -18,16 +18,16 @@
package org.apache.hadoop.hive.ql.io.orc;
-import java.io.IOException;
-
-import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-public class OrcFileStripeMergeInputFormat extends MergeInputFormat {
+import java.io.IOException;
+
+public class OrcFileStripeMergeInputFormat extends MergeFileInputFormat {
@Override
public RecordReader<OrcFileKeyWrapper, OrcFileValueWrapper> getRecordReader(
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SnappyCodec.java Tue Sep 16 17:37:13 2014
@@ -25,6 +25,7 @@ import org.iq80.snappy.Snappy;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
class SnappyCodec implements CompressionCodec, DirectDecompressionCodec {
@@ -99,4 +100,10 @@ class SnappyCodec implements Compression
decompressShim.decompress(in, out);
out.flip(); // flip for read
}
+
+ @Override
+ public CompressionCodec modify(EnumSet<Modifier> modifiers) {
+ // snappy allows no modifications
+ return this;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java Tue Sep 16 17:37:13 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
/**
* The interface for writing ORC files.
@@ -72,4 +73,30 @@ public interface Writer {
* @return the offset that would be a valid end location for an ORC file
*/
long writeIntermediateFooter() throws IOException;
+
+ /**
+ * Fast stripe append to ORC file. This interface is used for fast ORC file
+ * merge with other ORC files. When merging, the file to be merged should pass
+ * stripe in binary form along with stripe information and stripe statistics.
+ * After appending last stripe of a file, use appendUserMetadata() to append
+ * any user metadata.
+ * @param stripe - stripe as byte array
+ * @param offset - offset within byte array
+ * @param length - length of stripe within byte array
+ * @param stripeInfo - stripe information
+ * @param stripeStatistics - stripe statistics (Protobuf objects can be
+ * merged directly)
+ * @throws IOException
+ */
+ public void appendStripe(byte[] stripe, int offset, int length,
+ StripeInformation stripeInfo,
+ OrcProto.StripeStatistics stripeStatistics) throws IOException;
+
+ /**
+ * When fast stripe append is used for merging ORC stripes, after appending
+ * the last stripe from a file, this interface must be used to merge any
+ * user metadata.
+ * @param userMetadata - user metadata
+ */
+ public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Tue Sep 16 17:37:13 2014
@@ -24,10 +24,15 @@ import java.lang.management.ManagementFa
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -37,6 +42,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile.CompressionStrategy;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.EncodingStrategy;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.StripeStatistics;
@@ -71,10 +78,17 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static com.google.common.base.Preconditions.checkArgument;
/**
* An ORC file writer. The file is divided into stripes, which is the natural
@@ -139,6 +153,7 @@ class WriterImpl implements Writer, Memo
private final OrcFile.WriterCallback callback;
private final OrcFile.WriterContext callbackContext;
private final OrcFile.EncodingStrategy encodingStrategy;
+ private final OrcFile.CompressionStrategy compressionStrategy;
WriterImpl(FileSystem fs,
Path path,
@@ -153,6 +168,7 @@ class WriterImpl implements Writer, Memo
OrcFile.Version version,
OrcFile.WriterCallback callback,
OrcFile.EncodingStrategy encodingStrategy,
+ CompressionStrategy compressionStrategy,
float paddingTolerance,
long blockSizeValue) throws IOException {
this.fs = fs;
@@ -174,6 +190,7 @@ class WriterImpl implements Writer, Memo
this.defaultStripeSize = stripeSize;
this.version = version;
this.encodingStrategy = encodingStrategy;
+ this.compressionStrategy = compressionStrategy;
this.addBlockPadding = addBlockPadding;
this.blockSize = blockSizeValue;
this.paddingTolerance = paddingTolerance;
@@ -447,10 +464,35 @@ class WriterImpl implements Writer, Memo
public OutStream createStream(int column,
OrcProto.Stream.Kind kind
) throws IOException {
- StreamName name = new StreamName(column, kind);
+ final StreamName name = new StreamName(column, kind);
+ final EnumSet<CompressionCodec.Modifier> modifiers;
+
+ switch (kind) {
+ case DATA:
+ case DICTIONARY_DATA:
+ if (getCompressionStrategy() == CompressionStrategy.SPEED) {
+ modifiers = EnumSet.of(Modifier.FAST, Modifier.TEXT);
+ } else {
+ modifiers = EnumSet.of(Modifier.DEFAULT, Modifier.TEXT);
+ }
+ break;
+ case LENGTH:
+ case DICTIONARY_COUNT:
+ case PRESENT:
+ case ROW_INDEX:
+ case SECONDARY:
+ // easily compressed using the fastest modes
+ modifiers = EnumSet.of(Modifier.FASTEST, Modifier.BINARY);
+ break;
+ default:
+ modifiers = null;
+ break;
+ }
+
BufferedStream result = streams.get(name);
if (result == null) {
- result = new BufferedStream(name.toString(), bufferSize, codec);
+ result = new BufferedStream(name.toString(), bufferSize,
+ codec == null ? codec : codec.modify(modifiers));
streams.put(name, result);
}
return result.outStream;
@@ -496,6 +538,14 @@ class WriterImpl implements Writer, Memo
}
/**
+ * Get the compression strategy to use.
+ * @return compression strategy
+ */
+ public CompressionStrategy getCompressionStrategy() {
+ return compressionStrategy;
+ }
+
+ /**
* Get the writer's configuration.
* @return configuration
*/
@@ -2277,17 +2327,19 @@ class WriterImpl implements Writer, Memo
return rawWriter.getPos();
}
- void appendStripe(byte[] stripe, StripeInformation stripeInfo,
- OrcProto.StripeStatistics stripeStatistics) throws IOException {
- appendStripe(stripe, 0, stripe.length, stripeInfo, stripeStatistics);
- }
-
- void appendStripe(byte[] stripe, int offset, int length,
+ @Override
+ public void appendStripe(byte[] stripe, int offset, int length,
StripeInformation stripeInfo,
OrcProto.StripeStatistics stripeStatistics) throws IOException {
+ checkArgument(stripe != null, "Stripe must not be null");
+ checkArgument(length <= stripe.length,
+ "Specified length must not be greater specified array length");
+ checkArgument(stripeInfo != null, "Stripe information must not be null");
+ checkArgument(stripeStatistics != null,
+ "Stripe statistics must not be null");
+
getStream();
long start = rawWriter.getPos();
-
long stripeLen = length;
long availBlockSpace = blockSize - (start % blockSize);
@@ -2343,7 +2395,8 @@ class WriterImpl implements Writer, Memo
}
}
- void appendUserMetadata(List<UserMetadataItem> userMetadata) {
+ @Override
+ public void appendUserMetadata(List<UserMetadataItem> userMetadata) {
if (userMetadata != null) {
for (UserMetadataItem item : userMetadata) {
this.userMetadata.put(item.getName(), item.getValue());
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ZlibCodec.java Tue Sep 16 17:37:13 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.EnumSet;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
@@ -32,10 +33,24 @@ class ZlibCodec implements CompressionCo
private Boolean direct = null;
+ private final int level;
+ private final int strategy;
+
+ public ZlibCodec() {
+ level = Deflater.DEFAULT_COMPRESSION;
+ strategy = Deflater.DEFAULT_STRATEGY;
+ }
+
+ private ZlibCodec(int level, int strategy) {
+ this.level = level;
+ this.strategy = strategy;
+ }
+
@Override
public boolean compress(ByteBuffer in, ByteBuffer out,
ByteBuffer overflow) throws IOException {
- Deflater deflater = new Deflater(Deflater.DEFAULT_COMPRESSION, true);
+ Deflater deflater = new Deflater(level, true);
+ deflater.setStrategy(strategy);
int length = in.remaining();
deflater.setInput(in.array(), in.arrayOffset() + in.position(), length);
deflater.finish();
@@ -113,4 +128,37 @@ class ZlibCodec implements CompressionCo
decompressShim.decompress(in, out);
out.flip(); // flip for read
}
+
+ @Override
+ public CompressionCodec modify(EnumSet<Modifier> modifiers) {
+ int l = this.level;
+ int s = this.strategy;
+
+ for (Modifier m : modifiers) {
+ switch (m) {
+ case BINARY:
+ /* filtered == less LZ77, more huffman */
+ s = Deflater.FILTERED;
+ break;
+ case TEXT:
+ s = Deflater.DEFAULT_STRATEGY;
+ break;
+ case FASTEST:
+ // deflate_fast looking for 8 byte patterns
+ l = Deflater.BEST_SPEED;
+ break;
+ case FAST:
+ // deflate_fast looking for 16 byte patterns
+ l = Deflater.BEST_SPEED + 1;
+ break;
+ case DEFAULT:
+ // deflate_slow looking for 128 byte patterns
+ l = Deflater.DEFAULT_COMPRESSION;
+ break;
+ default:
+ break;
+ }
+ }
+ return new ZlibCodec(l, s);
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileBlockMergeInputFormat.java Tue Sep 16 17:37:13 2014
@@ -20,14 +20,14 @@ package org.apache.hadoop.hive.ql.io.rcf
import java.io.IOException;
-import org.apache.hadoop.hive.ql.io.merge.MergeInputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-public class RCFileBlockMergeInputFormat extends MergeInputFormat {
+public class RCFileBlockMergeInputFormat extends MergeFileInputFormat {
@Override
public RecordReader<RCFileKeyBufferWrapper, RCFileValueBufferWrapper>
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Tue Sep 16 17:37:13 2014
@@ -53,11 +53,12 @@ public class DbTxnManager extends HiveTx
}
@Override
- public void openTxn(String user) throws LockException {
+ public long openTxn(String user) throws LockException {
init();
try {
txnId = client.openTxn(user);
LOG.debug("Opened txn " + txnId);
+ return txnId;
} catch (TException e) {
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
e);
@@ -88,7 +89,11 @@ public class DbTxnManager extends HiveTx
// For each source to read, get a shared lock
for (ReadEntity input : plan.getInputs()) {
- if (!input.needsLock()) continue;
+ if (!input.needsLock() || input.isUpdateOrDelete()) {
+ // We don't want to acquire readlocks during update or delete as we'll be acquiring write
+ // locks instead.
+ continue;
+ }
LockComponentBuilder compBuilder = new LockComponentBuilder();
compBuilder.setShared();
@@ -297,6 +302,11 @@ public class DbTxnManager extends HiveTx
}
@Override
+ public boolean supportsAcid() {
+ return true;
+ }
+
+ @Override
protected void destruct() {
try {
if (txnId > 0) rollbackTxn();
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java Tue Sep 16 17:37:13 2014
@@ -48,8 +48,9 @@ class DummyTxnManager extends HiveTxnMan
private HiveLockManager lockMgr;
@Override
- public void openTxn(String user) throws LockException {
+ public long openTxn(String user) throws LockException {
// No-op
+ return 0L;
}
@Override
@@ -208,6 +209,11 @@ class DummyTxnManager extends HiveTxnMan
return false;
}
+ @Override
+ public boolean supportsAcid() {
+ return false;
+ }
+
protected void destruct() {
if (lockMgr != null) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java Tue Sep 16 17:37:13 2014
@@ -32,9 +32,10 @@ public interface HiveTxnManager {
/**
* Open a new transaction.
* @param user Hive user who is opening this transaction.
+ * @return The new transaction id
* @throws LockException if a transaction is already open.
*/
- void openTxn(String user) throws LockException;
+ long openTxn(String user) throws LockException;
/**
* Get the lock manager. This must be used rather than instantiating an
@@ -120,4 +121,10 @@ public interface HiveTxnManager {
* @return true if the new format should be used.
*/
boolean useNewShowLocksFormat();
+
+ /**
+ * Indicate whether this transaction manager supports ACID operations
+ * @return true if this transaction manager does ACID
+ */
+ boolean supportsAcid();
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Sep 16 17:37:13 2014
@@ -96,6 +96,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
import org.apache.hadoop.hive.ql.plan.DropTableDesc;
@@ -1227,7 +1228,7 @@ public class Hive {
public void loadPartition(Path loadPath, String tableName,
Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
- boolean isSrcLocal) throws HiveException {
+ boolean isSrcLocal, boolean isAcid) throws HiveException {
Table tbl = getTable(tableName);
Path tblDataLocationPath = tbl.getDataLocation();
try {
@@ -1275,7 +1276,7 @@ public class Hive {
isSrcLocal);
} else {
FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
- Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal);
+ Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
}
// recreate the partition if it existed before
@@ -1407,7 +1408,7 @@ private void constructOneLBLocationMap(F
*/
public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
String tableName, Map<String, String> partSpec, boolean replace,
- int numDP, boolean holdDDLTime, boolean listBucketingEnabled)
+ int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid)
throws HiveException {
Set<Path> validPartitions = new HashSet<Path>();
@@ -1463,7 +1464,7 @@ private void constructOneLBLocationMap(F
// finally load the partition -- move the file to the final table address
loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
- listBucketingEnabled, false);
+ listBucketingEnabled, false, isAcid);
LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
}
return fullPartSpecs;
@@ -1489,14 +1490,16 @@ private void constructOneLBLocationMap(F
* If the source directory is LOCAL
* @param isSkewedStoreAsSubdir
* if list bucketing enabled
+ * @param isAcid true if this is an ACID based write
*/
public void loadTable(Path loadPath, String tableName, boolean replace,
- boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir) throws HiveException {
+ boolean holdDDLTime, boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid)
+ throws HiveException {
Table tbl = getTable(tableName);
if (replace) {
tbl.replaceFiles(loadPath, isSrcLocal);
} else {
- tbl.copyFiles(loadPath, isSrcLocal);
+ tbl.copyFiles(loadPath, isSrcLocal, isAcid);
}
try {
@@ -2313,8 +2316,19 @@ private void constructOneLBLocationMap(F
return success;
}
+ /**
+ * Copy files. This handles building the mapping for buckets and such between the source and
+ * destination
+ * @param conf Configuration object
+ * @param srcf source directory, if bucketed should contain bucket files
+ * @param destf directory to move files into
+ * @param fs Filesystem
+ * @param isSrcLocal true if source is on local file system
+ * @param isAcid true if this is an ACID based write
+ * @throws HiveException
+ */
static protected void copyFiles(HiveConf conf, Path srcf, Path destf,
- FileSystem fs, boolean isSrcLocal) throws HiveException {
+ FileSystem fs, boolean isSrcLocal, boolean isAcid) throws HiveException {
boolean inheritPerms = HiveConf.getBoolVar(conf,
HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
try {
@@ -2342,23 +2356,105 @@ private void constructOneLBLocationMap(F
return;
// srcs = new FileStatus[0]; Why is this needed?
}
+
+ // If we're moving files around for an ACID write then the rules and paths are all different.
+ // You can blame this on Owen.
+ if (isAcid) {
+ moveAcidFiles(srcFs, srcs, destf);
+ } else {
// check that source and target paths exist
- List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false);
- // move it, move it
- try {
- for (List<Path[]> sdpairs : result) {
- for (Path[] sdpair : sdpairs) {
- if (!renameFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) {
- throw new IOException("Cannot move " + sdpair[0] + " to "
- + sdpair[1]);
+ List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false);
+ // move it, move it
+ try {
+ for (List<Path[]> sdpairs : result) {
+ for (Path[] sdpair : sdpairs) {
+ if (!renameFile(conf, sdpair[0], sdpair[1], fs, false, isSrcLocal)) {
+ throw new IOException("Cannot move " + sdpair[0] + " to "
+ + sdpair[1]);
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException("copyFiles: error while moving files!!!", e);
+ }
+ }
+ }
+
+ private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst)
+ throws HiveException {
+ // The layout for ACID files is table|partname/base|delta/bucket
+ // We will always only be writing delta files. In the buckets created by FileSinkOperator
+ // it will look like bucket/delta/bucket. So we need to move that into the above structure.
+ // For the first mover there will be no delta directory, so we can move the whole directory.
+ // For everyone else we will need to just move the buckets under the existing delta
+ // directory.
+
+ Set<Path> createdDeltaDirs = new HashSet<Path>();
+ // Open the original path we've been given and find the list of original buckets
+ for (FileStatus stat : stats) {
+ Path srcPath = stat.getPath();
+
+ LOG.debug("Acid move Looking for original buckets in " + srcPath);
+
+ FileStatus[] origBucketStats = null;
+ try {
+ origBucketStats = fs.listStatus(srcPath, AcidUtils.originalBucketFilter);
+ } catch (IOException e) {
+ String msg = "Unable to look for bucket files in src path " + srcPath.toUri().toString();
+ LOG.error(msg);
+ throw new HiveException(msg, e);
+ }
+ LOG.debug("Acid move found " + origBucketStats.length + " original buckets");
+
+ for (FileStatus origBucketStat : origBucketStats) {
+ Path origBucketPath = origBucketStat.getPath();
+ LOG.debug("Acid move looking for delta files in bucket " + origBucketPath);
+
+ FileStatus[] deltaStats = null;
+ try {
+ deltaStats = fs.listStatus(origBucketPath, AcidUtils.deltaFileFilter);
+ } catch (IOException e) {
+ throw new HiveException("Unable to look for delta files in original bucket " +
+ origBucketPath.toUri().toString(), e);
+ }
+ LOG.debug("Acid move found " + deltaStats.length + " delta files");
+
+ for (FileStatus deltaStat : deltaStats) {
+ Path deltaPath = deltaStat.getPath();
+ // Create the delta directory. Don't worry if it already exists,
+ // as that likely means another task got to it first. Then move each of the buckets.
+ // it would be more efficient to try to move the delta with it's buckets but that is
+ // harder to make race condition proof.
+ Path deltaDest = new Path(dst, deltaPath.getName());
+ try {
+ if (!createdDeltaDirs.contains(deltaDest)) {
+ try {
+ fs.mkdirs(deltaDest);
+ createdDeltaDirs.add(deltaDest);
+ } catch (IOException swallowIt) {
+ // Don't worry about this, as it likely just means it's already been created.
+ LOG.info("Unable to create delta directory " + deltaDest +
+ ", assuming it already exists: " + swallowIt.getMessage());
+ }
+ }
+ FileStatus[] bucketStats = fs.listStatus(deltaPath, AcidUtils.bucketFileFilter);
+ LOG.debug("Acid move found " + bucketStats.length + " bucket files");
+ for (FileStatus bucketStat : bucketStats) {
+ Path bucketSrc = bucketStat.getPath();
+ Path bucketDest = new Path(deltaDest, bucketSrc.getName());
+ LOG.info("Moving bucket " + bucketSrc.toUri().toString() + " to " +
+ bucketDest.toUri().toString());
+ fs.rename(bucketSrc, bucketDest);
+ }
+ } catch (IOException e) {
+ throw new HiveException("Error moving acid files", e);
}
}
}
- } catch (IOException e) {
- throw new HiveException("copyFiles: error while moving files!!!", e);
}
}
+
/**
* Replaces files in the partition with new data set specified by srcf. Works
* by renaming directory of srcf to the destination file.
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Tue Sep 16 17:37:13 2014
@@ -659,12 +659,14 @@ public class Table implements Serializab
* Files to be moved. Leaf directories or globbed file paths
* @param isSrcLocal
* If the source directory is LOCAL
+ * @param isAcid
+ * True if this is an ACID based insert, update, or delete
*/
- protected void copyFiles(Path srcf, boolean isSrcLocal) throws HiveException {
+ protected void copyFiles(Path srcf, boolean isSrcLocal, boolean isAcid) throws HiveException {
FileSystem fs;
try {
fs = getDataLocation().getFileSystem(Hive.get().getConf());
- Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal);
+ Hive.copyFiles(Hive.get().getConf(), srcf, getPath(), fs, isSrcLocal, isAcid);
} catch (IOException e) {
throw new HiveException("addFiles: filesystem error in check phase", e);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Tue Sep 16 17:37:13 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Re
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -371,6 +372,12 @@ public class BucketingSortingReduceSinkO
return null;
}
+ // Don't do this optimization with updates or deletes
+ if (pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.UPDATE ||
+ pGraphContext.getContext().getAcidOperation() == AcidUtils.Operation.DELETE){
+ return null;
+ }
+
// Support for dynamic partitions can be added later
if (fsOp.getConf().getDynPartCtx() != null) {
return null;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Tue Sep 16 17:37:13 2014
@@ -18,20 +18,6 @@
package org.apache.hadoop.hive.ql.optimizer;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -53,6 +39,8 @@ import org.apache.hadoop.hive.ql.exec.No
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator;
+import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -65,8 +53,10 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
+import org.apache.hadoop.hive.ql.io.orc.OrcFileStripeMergeInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileBlockMergeInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
@@ -88,6 +78,7 @@ import org.apache.hadoop.hive.ql.plan.Co
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
@@ -96,8 +87,10 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
@@ -106,6 +99,22 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
/**
* General utility common functions for the Processor to convert operator into
@@ -1250,33 +1259,20 @@ public final class GenMapRedUtils {
(conf.getBoolVar(ConfVars.HIVEMERGEORCFILESTRIPELEVEL) &&
fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class))) {
- // Check if InputFormatClass is valid
- final String inputFormatClass;
- if (fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
- inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+ cplan = GenMapRedUtils.createMergeTask(fsInputDesc, finalName,
+ dpCtx != null && dpCtx.getNumDPCols() > 0);
+ if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ work = new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
+ cplan.setName("Tez Merge File Work");
+ ((TezWork) work).add(cplan);
} else {
- inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATSTRIPELEVEL);
- }
- try {
- Class c = Class.forName(inputFormatClass);
-
- if(fsInputDesc.getTableInfo().getInputFileFormatClass().equals(OrcInputFormat.class)) {
- LOG.info("OrcFile format - Using stripe level merge");
- } else {
- LOG.info("RCFile format- Using block level merge");
- }
- cplan = GenMapRedUtils.createMergeTask(fsInputDesc, finalName,
- dpCtx != null && dpCtx.getNumDPCols() > 0);
work = cplan;
- } catch (ClassNotFoundException e) {
- String msg = "Illegal input format class: " + inputFormatClass;
- throw new SemanticException(msg);
}
} else {
cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
work = new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID));
- cplan.setName("Merge");
+ cplan.setName("Tez Merge File Work");
((TezWork)work).add(cplan);
} else {
work = new MapredWork();
@@ -1489,6 +1485,7 @@ public final class GenMapRedUtils {
*
* @param fsInputDesc
* @param finalName
+ * @param inputFormatClass
* @return MergeWork if table is stored as RCFile or ORCFile,
* null otherwise
*/
@@ -1498,38 +1495,62 @@ public final class GenMapRedUtils {
Path inputDir = fsInputDesc.getFinalDirName();
TableDesc tblDesc = fsInputDesc.getTableInfo();
- if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class) ||
- tblDesc.getInputFileFormatClass().equals(OrcInputFormat.class)) {
- ArrayList<Path> inputDirs = new ArrayList<Path>(1);
- ArrayList<String> inputDirstr = new ArrayList<String>(1);
- if (!hasDynamicPartitions
- && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
- inputDirs.add(inputDir);
- inputDirstr.add(inputDir.toString());
- }
-
- MergeWork work = new MergeWork(inputDirs, finalName,
- hasDynamicPartitions, fsInputDesc.getDynPartCtx(),
- tblDesc.getInputFileFormatClass());
- LinkedHashMap<String, ArrayList<String>> pathToAliases =
- new LinkedHashMap<String, ArrayList<String>>();
- pathToAliases.put(inputDir.toString(), (ArrayList<String>) inputDirstr.clone());
- work.setMapperCannotSpanPartns(true);
- work.setPathToAliases(pathToAliases);
- work.setAliasToWork(
- new LinkedHashMap<String, Operator<? extends OperatorDesc>>());
- if (hasDynamicPartitions
- || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
- work.getPathToPartitionInfo().put(inputDir.toString(),
- new PartitionDesc(tblDesc, null));
- }
- work.setListBucketingCtx(fsInputDesc.getLbCtx());
+ List<Path> inputDirs = new ArrayList<Path>(1);
+ ArrayList<String> inputDirstr = new ArrayList<String>(1);
+ // this will be populated by MergeFileWork.resolveDynamicPartitionStoredAsSubDirsMerge
+ // in case of dynamic partitioning and list bucketing
+ if (!hasDynamicPartitions &&
+ !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+ inputDirs.add(inputDir);
+ }
+ inputDirstr.add(inputDir.toString());
+
+ // internal input format class for CombineHiveInputFormat
+ final Class<? extends InputFormat> internalIFClass;
+ if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+ internalIFClass = RCFileBlockMergeInputFormat.class;
+ } else if (tblDesc.getInputFileFormatClass().equals(OrcInputFormat.class)) {
+ internalIFClass = OrcFileStripeMergeInputFormat.class;
+ } else {
+ throw new SemanticException("createMergeTask called on a table with file"
+ + " format other than RCFile or ORCFile");
+ }
- return work;
+ // create the merge file work
+ MergeFileWork work = new MergeFileWork(inputDirs, finalName,
+ hasDynamicPartitions, tblDesc.getInputFileFormatClass().getName());
+ LinkedHashMap<String, ArrayList<String>> pathToAliases =
+ new LinkedHashMap<String, ArrayList<String>>();
+ pathToAliases.put(inputDir.toString(), inputDirstr);
+ work.setMapperCannotSpanPartns(true);
+ work.setPathToAliases(pathToAliases);
+ PartitionDesc pDesc = new PartitionDesc(tblDesc, null);
+ pDesc.setInputFileFormatClass(internalIFClass);
+ work.getPathToPartitionInfo().put(inputDir.toString(), pDesc);
+ work.setListBucketingCtx(fsInputDesc.getLbCtx());
+
+ // create alias to work which contains the merge operator
+ LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork =
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
+ Operator<? extends OperatorDesc> mergeOp = null;
+ final FileMergeDesc fmd;
+ if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+ fmd = new RCFileMergeDesc();
+ } else {
+ fmd = new OrcFileMergeDesc();
}
+ fmd.setDpCtx(fsInputDesc.getDynPartCtx());
+ fmd.setOutputPath(finalName);
+ fmd.setHasDynamicPartitions(work.hasDynamicPartitions());
+ fmd.setListBucketingAlterTableConcatenate(work.isListBucketingAlterTableConcatenate());
+ int lbLevel = work.getListBucketingCtx() == null ? 0 :
+ work.getListBucketingCtx().calculateListBucketingLevel();
+ fmd.setListBucketingDepth(lbLevel);
+ mergeOp = OperatorFactory.get(fmd);
+ aliasToWork.put(inputDir.toString(), mergeOp);
+ work.setAliasToWork(aliasToWork);
- throw new SemanticException("createMergeTask called on a table with file"
- + " format other than RCFile or ORCFile");
+ return work;
}
/**
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Tue Sep 16 17:37:13 2014
@@ -118,7 +118,8 @@ public class Optimizer {
}
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
- !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
+ !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) &&
+ !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
transformations.add(new CorrelationOptimizer());
}
if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Tue Sep 16 17:37:13 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.optimi
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hive.ql.exec.Op
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -173,8 +175,22 @@ public class SortedDynPartitionOptimizer
destTable.getCols());
ObjectPair<List<Integer>, List<Integer>> sortOrderPositions = getSortPositionsOrder(
destTable.getSortCols(), destTable.getCols());
- List<Integer> sortPositions = sortOrderPositions.getFirst();
- List<Integer> sortOrder = sortOrderPositions.getSecond();
+ List<Integer> sortPositions = null;
+ List<Integer> sortOrder = null;
+ if (fsOp.getConf().getWriteType() == AcidUtils.Operation.UPDATE ||
+ fsOp.getConf().getWriteType() == AcidUtils.Operation.DELETE) {
+ // When doing updates and deletes we always want to sort on the rowid because the ACID
+ // reader will expect this sort order when doing reads. So
+ // ignore whatever comes from the table and enforce this sort order instead.
+ sortPositions = Arrays.asList(0);
+ sortOrder = Arrays.asList(1); // 1 means asc, could really use enum here in the thrift if
+ } else {
+ sortPositions = sortOrderPositions.getFirst();
+ sortOrder = sortOrderPositions.getSecond();
+ }
+ LOG.debug("Got sort order");
+ for (int i : sortPositions) LOG.debug("sort position " + i);
+ for (int i : sortOrder) LOG.debug("sort order " + i);
List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema());
List<ColumnInfo> colInfos = parseCtx.getOpParseCtx().get(fsParent).getRowResolver()
.getColumnInfos();
@@ -198,7 +214,7 @@ public class SortedDynPartitionOptimizer
colExprMap.put(ci.getInternalName(), newValueCols.get(newValueCols.size() - 1));
}
ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder,
- newValueCols, bucketColumns, numBuckets, fsParent);
+ newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
// Create ReduceSink operator
ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
@@ -319,7 +335,7 @@ public class SortedDynPartitionOptimizer
public ReduceSinkDesc getReduceSinkDesc(List<Integer> partitionPositions,
List<Integer> sortPositions, List<Integer> sortOrder, ArrayList<ExprNodeDesc> newValueCols,
ArrayList<ExprNodeDesc> bucketColumns, int numBuckets,
- Operator<? extends OperatorDesc> parent) {
+ Operator<? extends OperatorDesc> parent, AcidUtils.Operation writeType) {
// Order of KEY columns
// 1) Partition columns
@@ -409,7 +425,7 @@ public class SortedDynPartitionOptimizer
// Number of reducers is set to default (-1)
ReduceSinkDesc rsConf = new ReduceSinkDesc(newKeyCols, newKeyCols.size(), newValueCols,
outputKeyCols, distinctColumnIndices, outValColNames, -1, newPartCols, -1, keyTable,
- valueTable);
+ valueTable, writeType);
rsConf.setBucketCols(bucketColumns);
rsConf.setNumBuckets(numBuckets);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java Tue Sep 16 17:37:13 2014
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.Co
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
@@ -65,6 +66,7 @@ import org.apache.hadoop.hive.ql.plan.Fe
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCount;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMin;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFSum;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -230,8 +232,10 @@ public class StatsOptimizer implements T
// our stats for NDV is approx, not accurate.
return null;
}
- if (aggr.getGenericUDAFName().equals(AnnotationUtils.getAnnotation(GenericUDAFSum.class,
- Description.class).name())) {
+ // Get the aggregate function matching the name in the query.
+ GenericUDAFResolver udaf =
+ FunctionRegistry.getGenericUDAFResolver(aggr.getGenericUDAFName());
+ if (udaf instanceof GenericUDAFSum) {
if(!(aggr.getParameters().get(0) instanceof ExprNodeConstantDesc)){
return null;
}
@@ -244,8 +248,7 @@ public class StatsOptimizer implements T
ois.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
PrimitiveCategory.DECIMAL));
}
- else if (aggr.getGenericUDAFName().equals(AnnotationUtils.getAnnotation(GenericUDAFCount.class,
- Description.class).name())) {
+ else if (udaf instanceof GenericUDAFCount) {
Long rowCnt = 0L;
if ((aggr.getParameters().isEmpty() || aggr.getParameters().get(0) instanceof
ExprNodeConstantDesc)) {
@@ -326,8 +329,7 @@ public class StatsOptimizer implements T
oneRow.add(rowCnt);
ois.add(PrimitiveObjectInspectorFactory.
getPrimitiveJavaObjectInspector(PrimitiveCategory.LONG));
- } else if (aggr.getGenericUDAFName().equals(GenericUDAFMax.class.getAnnotation(
- Description.class).name())) {
+ } else if (udaf instanceof GenericUDAFMax) {
ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)aggr.getParameters().get(0);
String colName = colDesc.getColumn();
StatType type = getType(colDesc.getTypeString());
@@ -416,8 +418,7 @@ public class StatsOptimizer implements T
return null;
}
}
- } else if (aggr.getGenericUDAFName().equals(GenericUDAFMin.class.getAnnotation(
- Description.class).name())) {
+ } else if (udaf instanceof GenericUDAFMin) {
ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)aggr.getParameters().get(0);
String colName = colDesc.getColumn();
StatType type = getType(colDesc.getTypeString());
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/AppMasterEventProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/AppMasterEventProcessor.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/AppMasterEventProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/AppMasterEventProcessor.java Tue Sep 16 17:37:13 2014
@@ -56,6 +56,7 @@ public class AppMasterEventProcessor imp
} else {
events = new ArrayList<AppMasterEventOperator>();
}
+ events.add(event);
context.tsToEventMap.put(desc.getTableScan(), events);
return true;
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Tue Sep 16 17:37:13 2014
@@ -31,6 +31,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.antlr.runtime.tree.CommonTree;
import org.antlr.runtime.tree.Tree;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -89,6 +91,13 @@ public abstract class BaseSemanticAnalyz
protected HashMap<String, String> idToTableNameMap;
protected QueryProperties queryProperties;
+ /**
+ * A set of FileSinkOperators being written to in an ACID compliant way. We need to remember
+ * them here because when we build them we don't yet know the transaction id. We need to go
+ * back and set it once we actually start running the query.
+ */
+ protected Set<FileSinkDesc> acidFileSinks = new HashSet<FileSinkDesc>();
+
public static int HIVE_COLUMN_ORDER_ASC = 1;
public static int HIVE_COLUMN_ORDER_DESC = 0;
@@ -943,6 +952,10 @@ public abstract class BaseSemanticAnalyz
return queryProperties;
}
+ public Set<FileSinkDesc> getAcidFileSinks() {
+ return acidFileSinks;
+ }
+
/**
* Construct list bucketing context.
*
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java Tue Sep 16 17:37:13 2014
@@ -28,6 +28,7 @@ import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -70,7 +71,7 @@ public class ExportSemanticAnalyzer exte
"Target is not a directory : " + toURI));
} else {
FileStatus[] files = fs.listStatus(toPath);
- if (files != null) {
+ if (files != null && files.length != 0) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast,
"Target is not an empty directory : " + toURI));
}
@@ -120,6 +121,7 @@ public class ExportSemanticAnalyzer exte
rootTasks.add(rTask);
inputs.add(new ReadEntity(ts.tableHandle));
}
- outputs.add(new WriteEntity(parentPath, toURI.getScheme().equals("hdfs")));
+ boolean isLocal = FileUtils.isLocalFile(conf, toURI);
+ outputs.add(new WriteEntity(parentPath, isLocal));
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java Tue Sep 16 17:37:13 2014
@@ -18,11 +18,24 @@
package org.apache.hadoop.hive.ql.parse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
import org.antlr.runtime.tree.Tree;
import org.apache.commons.lang.ObjectUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
@@ -35,21 +48,22 @@ import org.apache.hadoop.hive.ql.ErrorMs
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
+import org.apache.hadoop.hive.ql.plan.CopyWork;
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.serdeConstants;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-
/**
* ImportSemanticAnalyzer.
*
@@ -82,6 +96,8 @@ public class ImportSemanticAnalyzer exte
List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(),
fromURI.getPath());
+ boolean isLocal = FileUtils.isLocalFile(conf, fromURI);
+ inputs.add(new ReadEntity(fromPath, isLocal));
try {
Path metadataPath = new Path(fromPath, METADATA_NAME);
Map.Entry<org.apache.hadoop.hive.metastore.api.Table,
@@ -475,7 +491,7 @@ public class ImportSemanticAnalyzer exte
String importedSerdeFormat = tableDesc.getSerdeProps().get(
serdeConstants.SERIALIZATION_FORMAT);
/*
- * If Imported SerdeFormat is null, then set it to "1" just as
+ * If Imported SerdeFormat is null, then set it to "1" just as
* metadata.Table.getEmptyTable
*/
importedSerdeFormat = importedSerdeFormat == null ? "1" : importedSerdeFormat;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Tue Sep 16 17:37:13 2014
@@ -172,6 +172,15 @@ public class QBParseInfo {
return insertIntoTables.contains(fullName.toLowerCase());
}
+ /**
+ * Check if a table is in the list to be inserted into
+ * @param fullTableName table name in dbname.tablename format
+ * @return
+ */
+ public boolean isInsertIntoTable(String fullTableName) {
+ return insertIntoTables.contains(fullTableName.toLowerCase());
+ }
+
public HashMap<String, ASTNode> getAggregationExprsForClause(String clause) {
return destToAggregationExprs.get(clause);
}