You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/09/16 19:37:19 UTC
svn commit: r1625341 [2/12] - in /hive/branches/llap: ./
common/src/java/org/apache/hadoop/hive/common/
common/src/java/org/apache/hadoop/hive/conf/
contrib/src/test/results/clientpositive/ data/conf/tez/ data/files/
hbase-handler/ itests/hive-unit-had...
Modified: hive/branches/llap/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java (original)
+++ hive/branches/llap/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/ShowCompactResponseElement.java Tue Sep 16 17:37:13 2014
@@ -51,12 +51,12 @@ public class ShowCompactResponseElement
private String dbname; // required
private String tablename; // required
- private String partitionname; // required
+ private String partitionname; // optional
private CompactionType type; // required
private String state; // required
- private String workerid; // required
- private long start; // required
- private String runAs; // required
+ private String workerid; // optional
+ private long start; // optional
+ private String runAs; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -144,6 +144,7 @@ public class ShowCompactResponseElement
// isset id assignments
private static final int __START_ISSET_ID = 0;
private byte __isset_bitfield = 0;
+ private _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.WORKERID,_Fields.START,_Fields.RUN_AS};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -151,17 +152,17 @@ public class ShowCompactResponseElement
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.TABLENAME, new org.apache.thrift.meta_data.FieldMetaData("tablename", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
- tmpMap.put(_Fields.PARTITIONNAME, new org.apache.thrift.meta_data.FieldMetaData("partitionname", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ tmpMap.put(_Fields.PARTITIONNAME, new org.apache.thrift.meta_data.FieldMetaData("partitionname", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, CompactionType.class)));
tmpMap.put(_Fields.STATE, new org.apache.thrift.meta_data.FieldMetaData("state", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
- tmpMap.put(_Fields.WORKERID, new org.apache.thrift.meta_data.FieldMetaData("workerid", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ tmpMap.put(_Fields.WORKERID, new org.apache.thrift.meta_data.FieldMetaData("workerid", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
- tmpMap.put(_Fields.START, new org.apache.thrift.meta_data.FieldMetaData("start", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ tmpMap.put(_Fields.START, new org.apache.thrift.meta_data.FieldMetaData("start", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
- tmpMap.put(_Fields.RUN_AS, new org.apache.thrift.meta_data.FieldMetaData("runAs", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ tmpMap.put(_Fields.RUN_AS, new org.apache.thrift.meta_data.FieldMetaData("runAs", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ShowCompactResponseElement.class, metaDataMap);
@@ -173,23 +174,14 @@ public class ShowCompactResponseElement
public ShowCompactResponseElement(
String dbname,
String tablename,
- String partitionname,
CompactionType type,
- String state,
- String workerid,
- long start,
- String runAs)
+ String state)
{
this();
this.dbname = dbname;
this.tablename = tablename;
- this.partitionname = partitionname;
this.type = type;
this.state = state;
- this.workerid = workerid;
- this.start = start;
- setStartIsSet(true);
- this.runAs = runAs;
}
/**
@@ -622,8 +614,8 @@ public class ShowCompactResponseElement
return false;
}
- boolean this_present_start = true;
- boolean that_present_start = true;
+ boolean this_present_start = true && this.isSetStart();
+ boolean that_present_start = true && that.isSetStart();
if (this_present_start || that_present_start) {
if (!(this_present_start && that_present_start))
return false;
@@ -677,7 +669,7 @@ public class ShowCompactResponseElement
if (present_workerid)
builder.append(workerid);
- boolean present_start = true;
+ boolean present_start = true && (isSetStart());
builder.append(present_start);
if (present_start)
builder.append(start);
@@ -813,14 +805,16 @@ public class ShowCompactResponseElement
sb.append(this.tablename);
}
first = false;
- if (!first) sb.append(", ");
- sb.append("partitionname:");
- if (this.partitionname == null) {
- sb.append("null");
- } else {
- sb.append(this.partitionname);
+ if (isSetPartitionname()) {
+ if (!first) sb.append(", ");
+ sb.append("partitionname:");
+ if (this.partitionname == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.partitionname);
+ }
+ first = false;
}
- first = false;
if (!first) sb.append(", ");
sb.append("type:");
if (this.type == null) {
@@ -837,26 +831,32 @@ public class ShowCompactResponseElement
sb.append(this.state);
}
first = false;
- if (!first) sb.append(", ");
- sb.append("workerid:");
- if (this.workerid == null) {
- sb.append("null");
- } else {
- sb.append(this.workerid);
+ if (isSetWorkerid()) {
+ if (!first) sb.append(", ");
+ sb.append("workerid:");
+ if (this.workerid == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.workerid);
+ }
+ first = false;
}
- first = false;
- if (!first) sb.append(", ");
- sb.append("start:");
- sb.append(this.start);
- first = false;
- if (!first) sb.append(", ");
- sb.append("runAs:");
- if (this.runAs == null) {
- sb.append("null");
- } else {
- sb.append(this.runAs);
+ if (isSetStart()) {
+ if (!first) sb.append(", ");
+ sb.append("start:");
+ sb.append(this.start);
+ first = false;
+ }
+ if (isSetRunAs()) {
+ if (!first) sb.append(", ");
+ sb.append("runAs:");
+ if (this.runAs == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.runAs);
+ }
+ first = false;
}
- first = false;
sb.append(")");
return sb.toString();
}
@@ -871,10 +871,6 @@ public class ShowCompactResponseElement
throw new org.apache.thrift.protocol.TProtocolException("Required field 'tablename' is unset! Struct:" + toString());
}
- if (!isSetPartitionname()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'partitionname' is unset! Struct:" + toString());
- }
-
if (!isSetType()) {
throw new org.apache.thrift.protocol.TProtocolException("Required field 'type' is unset! Struct:" + toString());
}
@@ -883,18 +879,6 @@ public class ShowCompactResponseElement
throw new org.apache.thrift.protocol.TProtocolException("Required field 'state' is unset! Struct:" + toString());
}
- if (!isSetWorkerid()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'workerid' is unset! Struct:" + toString());
- }
-
- if (!isSetStart()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'start' is unset! Struct:" + toString());
- }
-
- if (!isSetRunAs()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'runAs' is unset! Struct:" + toString());
- }
-
// check for sub-struct validity
}
@@ -1022,9 +1006,11 @@ public class ShowCompactResponseElement
oprot.writeFieldEnd();
}
if (struct.partitionname != null) {
- oprot.writeFieldBegin(PARTITIONNAME_FIELD_DESC);
- oprot.writeString(struct.partitionname);
- oprot.writeFieldEnd();
+ if (struct.isSetPartitionname()) {
+ oprot.writeFieldBegin(PARTITIONNAME_FIELD_DESC);
+ oprot.writeString(struct.partitionname);
+ oprot.writeFieldEnd();
+ }
}
if (struct.type != null) {
oprot.writeFieldBegin(TYPE_FIELD_DESC);
@@ -1037,17 +1023,23 @@ public class ShowCompactResponseElement
oprot.writeFieldEnd();
}
if (struct.workerid != null) {
- oprot.writeFieldBegin(WORKERID_FIELD_DESC);
- oprot.writeString(struct.workerid);
+ if (struct.isSetWorkerid()) {
+ oprot.writeFieldBegin(WORKERID_FIELD_DESC);
+ oprot.writeString(struct.workerid);
+ oprot.writeFieldEnd();
+ }
+ }
+ if (struct.isSetStart()) {
+ oprot.writeFieldBegin(START_FIELD_DESC);
+ oprot.writeI64(struct.start);
oprot.writeFieldEnd();
}
- oprot.writeFieldBegin(START_FIELD_DESC);
- oprot.writeI64(struct.start);
- oprot.writeFieldEnd();
if (struct.runAs != null) {
- oprot.writeFieldBegin(RUN_AS_FIELD_DESC);
- oprot.writeString(struct.runAs);
- oprot.writeFieldEnd();
+ if (struct.isSetRunAs()) {
+ oprot.writeFieldBegin(RUN_AS_FIELD_DESC);
+ oprot.writeString(struct.runAs);
+ oprot.writeFieldEnd();
+ }
}
oprot.writeFieldStop();
oprot.writeStructEnd();
@@ -1068,12 +1060,34 @@ public class ShowCompactResponseElement
TTupleProtocol oprot = (TTupleProtocol) prot;
oprot.writeString(struct.dbname);
oprot.writeString(struct.tablename);
- oprot.writeString(struct.partitionname);
oprot.writeI32(struct.type.getValue());
oprot.writeString(struct.state);
- oprot.writeString(struct.workerid);
- oprot.writeI64(struct.start);
- oprot.writeString(struct.runAs);
+ BitSet optionals = new BitSet();
+ if (struct.isSetPartitionname()) {
+ optionals.set(0);
+ }
+ if (struct.isSetWorkerid()) {
+ optionals.set(1);
+ }
+ if (struct.isSetStart()) {
+ optionals.set(2);
+ }
+ if (struct.isSetRunAs()) {
+ optionals.set(3);
+ }
+ oprot.writeBitSet(optionals, 4);
+ if (struct.isSetPartitionname()) {
+ oprot.writeString(struct.partitionname);
+ }
+ if (struct.isSetWorkerid()) {
+ oprot.writeString(struct.workerid);
+ }
+ if (struct.isSetStart()) {
+ oprot.writeI64(struct.start);
+ }
+ if (struct.isSetRunAs()) {
+ oprot.writeString(struct.runAs);
+ }
}
@Override
@@ -1083,18 +1097,27 @@ public class ShowCompactResponseElement
struct.setDbnameIsSet(true);
struct.tablename = iprot.readString();
struct.setTablenameIsSet(true);
- struct.partitionname = iprot.readString();
- struct.setPartitionnameIsSet(true);
struct.type = CompactionType.findByValue(iprot.readI32());
struct.setTypeIsSet(true);
struct.state = iprot.readString();
struct.setStateIsSet(true);
- struct.workerid = iprot.readString();
- struct.setWorkeridIsSet(true);
- struct.start = iprot.readI64();
- struct.setStartIsSet(true);
- struct.runAs = iprot.readString();
- struct.setRunAsIsSet(true);
+ BitSet incoming = iprot.readBitSet(4);
+ if (incoming.get(0)) {
+ struct.partitionname = iprot.readString();
+ struct.setPartitionnameIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.workerid = iprot.readString();
+ struct.setWorkeridIsSet(true);
+ }
+ if (incoming.get(2)) {
+ struct.start = iprot.readI64();
+ struct.setStartIsSet(true);
+ }
+ if (incoming.get(3)) {
+ struct.runAs = iprot.readString();
+ struct.setRunAsIsSet(true);
+ }
}
}
Modified: hive/branches/llap/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java (original)
+++ hive/branches/llap/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/StorageDescriptor.java Tue Sep 16 17:37:13 2014
@@ -216,17 +216,17 @@ public class StorageDescriptor implement
{
this();
this.cols = cols;
- this.location = org.apache.hive.common.util.HiveStringUtils.intern(location);
- this.inputFormat = org.apache.hive.common.util.HiveStringUtils.intern(inputFormat);
- this.outputFormat = org.apache.hive.common.util.HiveStringUtils.intern(outputFormat);
+ this.location = location;
+ this.inputFormat = inputFormat;
+ this.outputFormat = outputFormat;
this.compressed = compressed;
setCompressedIsSet(true);
this.numBuckets = numBuckets;
setNumBucketsIsSet(true);
this.serdeInfo = serdeInfo;
- this.bucketCols = org.apache.hive.common.util.HiveStringUtils.intern(bucketCols);
+ this.bucketCols = bucketCols;
this.sortCols = sortCols;
- this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+ this.parameters = parameters;
}
/**
@@ -242,13 +242,13 @@ public class StorageDescriptor implement
this.cols = __this__cols;
}
if (other.isSetLocation()) {
- this.location = org.apache.hive.common.util.HiveStringUtils.intern(other.location);
+ this.location = other.location;
}
if (other.isSetInputFormat()) {
- this.inputFormat = org.apache.hive.common.util.HiveStringUtils.intern(other.inputFormat);
+ this.inputFormat = other.inputFormat;
}
if (other.isSetOutputFormat()) {
- this.outputFormat = org.apache.hive.common.util.HiveStringUtils.intern(other.outputFormat);
+ this.outputFormat = other.outputFormat;
}
this.compressed = other.compressed;
this.numBuckets = other.numBuckets;
@@ -276,9 +276,9 @@ public class StorageDescriptor implement
String other_element_key = other_element.getKey();
String other_element_value = other_element.getValue();
- String __this__parameters_copy_key = org.apache.hive.common.util.HiveStringUtils.intern(other_element_key);
+ String __this__parameters_copy_key = other_element_key;
- String __this__parameters_copy_value = org.apache.hive.common.util.HiveStringUtils.intern(other_element_value);
+ String __this__parameters_copy_value = other_element_value;
__this__parameters.put(__this__parameters_copy_key, __this__parameters_copy_value);
}
@@ -356,7 +356,7 @@ public class StorageDescriptor implement
}
public void setLocation(String location) {
- this.location = org.apache.hive.common.util.HiveStringUtils.intern(location);
+ this.location = location;
}
public void unsetLocation() {
@@ -379,7 +379,7 @@ public class StorageDescriptor implement
}
public void setInputFormat(String inputFormat) {
- this.inputFormat = org.apache.hive.common.util.HiveStringUtils.intern(inputFormat);
+ this.inputFormat = inputFormat;
}
public void unsetInputFormat() {
@@ -402,7 +402,7 @@ public class StorageDescriptor implement
}
public void setOutputFormat(String outputFormat) {
- this.outputFormat = org.apache.hive.common.util.HiveStringUtils.intern(outputFormat);
+ this.outputFormat = outputFormat;
}
public void unsetOutputFormat() {
@@ -507,7 +507,7 @@ public class StorageDescriptor implement
}
public void setBucketCols(List<String> bucketCols) {
- this.bucketCols = org.apache.hive.common.util.HiveStringUtils.intern(bucketCols);
+ this.bucketCols = bucketCols;
}
public void unsetBucketCols() {
@@ -579,7 +579,7 @@ public class StorageDescriptor implement
}
public void setParameters(Map<String,String> parameters) {
- this.parameters = org.apache.hive.common.util.HiveStringUtils.intern(parameters);
+ this.parameters = parameters;
}
public void unsetParameters() {
Modified: hive/branches/llap/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py (original)
+++ hive/branches/llap/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py Tue Sep 16 17:37:13 2014
@@ -8054,18 +8054,10 @@ class ShowCompactResponseElement:
raise TProtocol.TProtocolException(message='Required field dbname is unset!')
if self.tablename is None:
raise TProtocol.TProtocolException(message='Required field tablename is unset!')
- if self.partitionname is None:
- raise TProtocol.TProtocolException(message='Required field partitionname is unset!')
if self.type is None:
raise TProtocol.TProtocolException(message='Required field type is unset!')
if self.state is None:
raise TProtocol.TProtocolException(message='Required field state is unset!')
- if self.workerid is None:
- raise TProtocol.TProtocolException(message='Required field workerid is unset!')
- if self.start is None:
- raise TProtocol.TProtocolException(message='Required field start is unset!')
- if self.runAs is None:
- raise TProtocol.TProtocolException(message='Required field runAs is unset!')
return
Modified: hive/branches/llap/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb (original)
+++ hive/branches/llap/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb Tue Sep 16 17:37:13 2014
@@ -1946,12 +1946,12 @@ class ShowCompactResponseElement
FIELDS = {
DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'},
TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tablename'},
- PARTITIONNAME => {:type => ::Thrift::Types::STRING, :name => 'partitionname'},
+ PARTITIONNAME => {:type => ::Thrift::Types::STRING, :name => 'partitionname', :optional => true},
TYPE => {:type => ::Thrift::Types::I32, :name => 'type', :enum_class => ::CompactionType},
STATE => {:type => ::Thrift::Types::STRING, :name => 'state'},
- WORKERID => {:type => ::Thrift::Types::STRING, :name => 'workerid'},
- START => {:type => ::Thrift::Types::I64, :name => 'start'},
- RUNAS => {:type => ::Thrift::Types::STRING, :name => 'runAs'}
+ WORKERID => {:type => ::Thrift::Types::STRING, :name => 'workerid', :optional => true},
+ START => {:type => ::Thrift::Types::I64, :name => 'start', :optional => true},
+ RUNAS => {:type => ::Thrift::Types::STRING, :name => 'runAs', :optional => true}
}
def struct_fields; FIELDS; end
@@ -1959,12 +1959,8 @@ class ShowCompactResponseElement
def validate
raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbname is unset!') unless @dbname
raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tablename is unset!') unless @tablename
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field partitionname is unset!') unless @partitionname
raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field type is unset!') unless @type
raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field state is unset!') unless @state
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field workerid is unset!') unless @workerid
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field start is unset!') unless @start
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field runAs is unset!') unless @runAs
unless @type.nil? || ::CompactionType::VALID_VALUES.include?(@type)
raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Invalid value of field type!')
end
Modified: hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java (original)
+++ hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java Tue Sep 16 17:37:13 2014
@@ -21,8 +21,6 @@ package org.apache.hadoop.hive.metastore
import static org.apache.commons.lang.StringUtils.join;
import static org.apache.commons.lang.StringUtils.repeat;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.ParseException;
@@ -42,13 +40,11 @@ import javax.jdo.datastore.JDOConnection
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math3.stat.StatUtils;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.Decimal;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
Modified: hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java (original)
+++ hive/branches/llap/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java Tue Sep 16 17:37:13 2014
@@ -1081,7 +1081,8 @@ public class TxnHandler {
private static Map<LockType, Map<LockType, Map<LockState, LockAction>>> jumpTable;
private void checkQFileTestHack() {
- boolean hackOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST);
+ boolean hackOn = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST) ||
+ HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEZ_TEST);
if (hackOn) {
LOG.info("Hacking in canned values for transaction manager");
// Set up the transaction/locking db in the derby metastore
Modified: hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStorePartitionSpecs.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStorePartitionSpecs.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStorePartitionSpecs.java (original)
+++ hive/branches/llap/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStorePartitionSpecs.java Tue Sep 16 17:37:13 2014
@@ -11,7 +11,6 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.partition.spec.CompositePartitionSpecProxy;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
-import org.apache.hadoop.util.ExitUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -52,7 +51,7 @@ public class TestHiveMetaStorePartitionS
public void checkExit(int status) {
super.checkExit(status);
- throw new ExitUtil.ExitException(status, "System.exit() was called. Raising exception. ");
+ throw new RuntimeException("System.exit() was called. Raising exception. ");
}
}
Modified: hive/branches/llap/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java (original)
+++ hive/branches/llap/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/OperatorType.java Tue Sep 16 17:37:13 2014
@@ -7,10 +7,6 @@
package org.apache.hadoop.hive.ql.plan.api;
-import java.util.Map;
-import java.util.HashMap;
-import org.apache.thrift.TEnum;
-
public enum OperatorType implements org.apache.thrift.TEnum {
JOIN(0),
MAPJOIN(1),
@@ -33,7 +29,9 @@ public enum OperatorType implements org.
PTF(18),
MUX(19),
DEMUX(20),
- EVENT(21);
+ EVENT(21),
+ ORCFILEMERGE(22),
+ RCFILEMERGE(23);
private final int value;
@@ -98,6 +96,10 @@ public enum OperatorType implements org.
return DEMUX;
case 21:
return EVENT;
+ case 22:
+ return ORCFILEMERGE;
+ case 23:
+ return RCFILEMERGE;
default:
return null;
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Context.java Tue Sep 16 17:37:13 2014
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.common.Fil
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
@@ -98,6 +99,11 @@ public class Context {
// Transaction manager for this query
protected HiveTxnManager hiveTxnManager;
+ // Used to track what type of acid operation (insert, update, or delete) we are doing. Useful
+ // since we want to change where bucket columns are accessed in some operators and
+ // optimizations when doing updates and deletes.
+ private AcidUtils.Operation acidOperation = AcidUtils.Operation.NOT_ACID;
+
private boolean needLockMgr;
// Keep track of the mapping from load table desc to the output and the lock
@@ -612,4 +618,12 @@ public class Context {
public void setTryCount(int tryCount) {
this.tryCount = tryCount;
}
+
+ public void setAcidOperation(AcidUtils.Operation op) {
+ acidOperation = op;
+ }
+
+ public AcidUtils.Operation getAcidOperation() {
+ return acidOperation;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Tue Sep 16 17:37:13 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.MoveTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -95,6 +96,7 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -136,6 +138,9 @@ public class Driver implements CommandPr
private String SQLState;
private Throwable downstreamError;
+ // A list of FileSinkOperators writing in an ACID compliant manner
+ private Set<FileSinkDesc> acidSinks;
+
// A limit on the number of threads that can be launched
private int maxthreads;
private int tryCount = Integer.MAX_VALUE;
@@ -407,6 +412,9 @@ public class Driver implements CommandPr
} else {
sem.analyze(tree, ctx);
}
+ // Record any ACID compliant FileSinkOperators we saw so we can add our transaction ID to
+ // them later.
+ acidSinks = sem.getAcidFileSinks();
LOG.info("Semantic Analysis Completed");
@@ -722,6 +730,11 @@ public class Driver implements CommandPr
//do not authorize temporary uris
continue;
}
+ if (privObject instanceof ReadEntity && ((ReadEntity)privObject).isUpdateOrDelete()) {
+ // Skip this one, as we don't want to check select privileges for the table we're reading
+ // for an update or delete.
+ continue;
+ }
//support for authorization on partitions needs to be added
String dbname = null;
@@ -858,7 +871,9 @@ public class Driver implements CommandPr
private int recordValidTxns() {
try {
ValidTxnList txns = SessionState.get().getTxnMgr().getValidTxns();
- conf.set(ValidTxnList.VALID_TXNS_KEY, txns.toString());
+ String txnStr = txns.toString();
+ conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
+ LOG.debug("Encoding valid txns info " + txnStr);
return 0;
} catch (LockException e) {
errorMessage = "FAILED: Error in determing valid transactions: " + e.getMessage();
@@ -876,13 +891,44 @@ public class Driver implements CommandPr
* pretty simple. If all the locks cannot be obtained, error out. Deadlock is avoided by making
* sure that the locks are lexicographically sorted.
**/
- private int acquireReadWriteLocks() {
+ private int acquireLocksAndOpenTxn() {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ACQUIRE_READ_WRITE_LOCKS);
+ SessionState ss = SessionState.get();
+ HiveTxnManager txnMgr = ss.getTxnMgr();
try {
- SessionState.get().getTxnMgr().acquireLocks(plan, ctx, userName);
+ // Don't use the userName member, as it may or may not have been set. Get the value from
+ // conf, which calls into getUGI to figure out who the process is running as.
+ String userFromUGI;
+ try {
+ userFromUGI = conf.getUser();
+ } catch (IOException e) {
+ errorMessage = "FAILED: Error in determining user while acquiring locks: " + e.getMessage();
+ SQLState = ErrorMsg.findSQLState(e.getMessage());
+ downstreamError = e;
+ console.printError(errorMessage,
+ "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ return 10;
+ }
+ if (acidSinks != null && acidSinks.size() > 0) {
+ // We are writing to tables in an ACID compliant way, so we need to open a transaction
+ long txnId = ss.getCurrentTxn();
+ if (txnId == SessionState.NO_CURRENT_TXN) {
+ txnId = txnMgr.openTxn(userFromUGI);
+ ss.setCurrentTxn(txnId);
+ }
+ // Set the transaction id in all of the acid file sinks
+ if (acidSinks != null) {
+ for (FileSinkDesc desc : acidSinks) {
+ desc.setTransactionId(txnId);
+ }
+ }
+ }
+
+ txnMgr.acquireLocks(plan, ctx, userFromUGI);
+
return 0;
} catch (LockException e) {
errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage();
@@ -900,13 +946,33 @@ public class Driver implements CommandPr
* @param hiveLocks
* list of hive locks to be released Release all the locks specified. If some of the
* locks have already been released, ignore them
+ * @param commit if there is an open transaction and if true, commit,
+ * if false rollback. If there is no open transaction this parameter is ignored.
+ *
**/
- private void releaseLocks(List<HiveLock> hiveLocks) throws LockException {
+ private void releaseLocksAndCommitOrRollback(List<HiveLock> hiveLocks, boolean commit)
+ throws LockException {
PerfLogger perfLogger = PerfLogger.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.RELEASE_LOCKS);
- if (hiveLocks != null) {
- SessionState.get().getTxnMgr().getLockManager().releaseLocks(hiveLocks);
+ SessionState ss = SessionState.get();
+ HiveTxnManager txnMgr = ss.getTxnMgr();
+ // If we've opened a transaction we need to commit or rollback rather than explicitly
+ // releasing the locks.
+ if (ss.getCurrentTxn() != SessionState.NO_CURRENT_TXN && ss.isAutoCommit()) {
+ try {
+ if (commit) {
+ txnMgr.commitTxn();
+ } else {
+ txnMgr.rollbackTxn();
+ }
+ } finally {
+ ss.setCurrentTxn(SessionState.NO_CURRENT_TXN);
+ }
+ } else {
+ if (hiveLocks != null) {
+ txnMgr.getLockManager().releaseLocks(hiveLocks);
+ }
}
ctx.setHiveLocks(null);
@@ -993,7 +1059,7 @@ public class Driver implements CommandPr
}
if (ret != 0) {
try {
- releaseLocks(ctx.getHiveLocks());
+ releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
} catch (LockException e) {
LOG.warn("Exception in releasing locks. "
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
@@ -1096,10 +1162,10 @@ public class Driver implements CommandPr
}
if (requireLock) {
- ret = acquireReadWriteLocks();
+ ret = acquireLocksAndOpenTxn();
if (ret != 0) {
try {
- releaseLocks(ctx.getHiveLocks());
+ releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
} catch (LockException e) {
// Not much to do here
}
@@ -1111,7 +1177,7 @@ public class Driver implements CommandPr
if (ret != 0) {
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
- releaseLocks(ctx.getHiveLocks());
+ releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
} catch (LockException e) {
// Nothing to do here
}
@@ -1120,7 +1186,7 @@ public class Driver implements CommandPr
//if needRequireLock is false, the release here will do nothing because there is no lock
try {
- releaseLocks(ctx.getHiveLocks());
+ releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), true);
} catch (LockException e) {
errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
SQLState = ErrorMsg.findSQLState(e.getMessage());
@@ -1523,10 +1589,17 @@ public class Driver implements CommandPr
cxt.launching(tskRun);
// Launch Task
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL)
+ && (tsk.isMapRedTask() || (tsk instanceof MoveTask))) {
// Launch it in the parallel mode, as a separate thread only for MR tasks
+ if (LOG.isInfoEnabled()){
+ LOG.info("Starting task [" + tsk + "] in parallel");
+ }
tskRun.start();
} else {
+ if (LOG.isInfoEnabled()){
+ LOG.info("Starting task [" + tsk + "] in serial mode");
+ }
tskRun.runSequential();
}
return tskRun;
@@ -1658,7 +1731,7 @@ public class Driver implements CommandPr
destroyed = true;
if (ctx != null) {
try {
- releaseLocks(ctx.getHiveLocks());
+ releaseLocksAndCommitOrRollback(ctx.getHiveLocks(), false);
} catch (LockException e) {
LOG.warn("Exception when releasing locking in destroy: " +
e.getMessage());
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Tue Sep 16 17:37:13 2014
@@ -404,6 +404,19 @@ public enum ErrorMsg {
"time."),
DISTINCT_NOT_SUPPORTED(10285, "Distinct keyword is not support in current context"),
+ UPDATEDELETE_PARSE_ERROR(10290, "Encountered parse error while parsing rewritten update or " +
+ "delete query"),
+ UPDATEDELETE_IO_ERROR(10291, "Encountered I/O error while parsing rewritten update or " +
+ "delete query"),
+ UPDATE_CANNOT_UPDATE_PART_VALUE(10292, "Updating values of partition columns is not supported"),
+ INSERT_CANNOT_CREATE_TEMP_FILE(10293, "Unable to create temp file for insert values "),
+ ACID_OP_ON_NONACID_TXNMGR(10294, "Attempt to do update or delete using transaction manager that" +
+ " does not support these operations."),
+ NO_INSERT_OVERWRITE_WITH_ACID(10295, "INSERT OVERWRITE not allowed on table with OutputFormat " +
+ "that implements AcidOutputFormat while transaction manager that supports ACID is in use"),
+ VALUES_TABLE_CONSTRUCTOR_NOT_SUPPORTED(10296,
+ "Values clause with table constructor not yet supported"),
+
//========================== 20000 range starts here ========================//
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
@@ -460,7 +473,10 @@ public enum ErrorMsg {
"to fail because of this, set hive.stats.atomic=false", true),
STATS_SKIPPING_BY_ERROR(30017, "Skipping stats aggregation by error {0}", true),
ORC_CORRUPTED_READ(30018, "Corruption in ORC data encountered. To skip reading corrupted "
- + "data, set " + HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA + " to true");
+ + "data, set " + HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA + " to true"),
+
+
+
;
private int errorCode;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsTask.java Tue Sep 16 17:37:13 2014
@@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
@@ -305,8 +306,10 @@ public class ColumnStatsTask extends Tas
List<String> partVals = new ArrayList<String>();
// Iterate over partition columns to figure out partition name
for (int i = fields.size() - partColSchema.size(); i < fields.size(); i++) {
- partVals.add(((PrimitiveObjectInspector)fields.get(i).getFieldObjectInspector()).
- getPrimitiveJavaObject(list.get(i)).toString());
+ Object partVal = ((PrimitiveObjectInspector)fields.get(i).getFieldObjectInspector()).
+ getPrimitiveJavaObject(list.get(i));
+ partVals.add(partVal == null ? // could be null for default partition
+ this.conf.getVar(ConfVars.DEFAULTPARTITIONNAME) : partVal.toString());
}
partName = Warehouse.makePartName(partColSchema, partVals);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Tue Sep 16 17:37:13 2014
@@ -18,32 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.hadoop.util.StringUtils.stringifyException;
-
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Serializable;
-import java.io.Writer;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -88,8 +62,9 @@ import org.apache.hadoop.hive.ql.QueryPl
import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.merge.MergeTask;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
@@ -133,14 +108,19 @@ import org.apache.hadoop.hive.ql.plan.De
import org.apache.hadoop.hive.ql.plan.DropDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.DropIndexDesc;
import org.apache.hadoop.hive.ql.plan.DropTableDesc;
+import org.apache.hadoop.hive.ql.plan.FileMergeDesc;
import org.apache.hadoop.hive.ql.plan.GrantDesc;
import org.apache.hadoop.hive.ql.plan.GrantRevokeRoleDDL;
+import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.LockTableDesc;
import org.apache.hadoop.hive.ql.plan.MsckDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.PrincipalDesc;
import org.apache.hadoop.hive.ql.plan.PrivilegeDesc;
import org.apache.hadoop.hive.ql.plan.PrivilegeObjectDesc;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
import org.apache.hadoop.hive.ql.plan.RevokeDesc;
import org.apache.hadoop.hive.ql.plan.RoleDDLDesc;
@@ -194,6 +174,33 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.hive.common.util.AnnotationUtils;
import org.stringtemplate.v4.ST;
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.io.Writer;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static org.apache.commons.lang.StringUtils.join;
+import static org.apache.hadoop.util.StringUtils.stringifyException;
+
/**
* DDLTask implementation.
*
@@ -546,15 +553,39 @@ public class DDLTask extends Task<DDLWor
*/
private int mergeFiles(Hive db, AlterTablePartMergeFilesDesc mergeFilesDesc)
throws HiveException {
+ ListBucketingCtx lbCtx = mergeFilesDesc.getLbCtx();
+ boolean lbatc = lbCtx == null ? false : lbCtx.isSkewedStoredAsDir();
+ int lbd = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel();
+
// merge work only needs input and output.
- MergeWork mergeWork = new MergeWork(mergeFilesDesc.getInputDir(),
- mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass());
+ MergeFileWork mergeWork = new MergeFileWork(mergeFilesDesc.getInputDir(),
+ mergeFilesDesc.getOutputDir(), mergeFilesDesc.getInputFormatClass().getName());
mergeWork.setListBucketingCtx(mergeFilesDesc.getLbCtx());
mergeWork.resolveConcatenateMerge(db.getConf());
mergeWork.setMapperCannotSpanPartns(true);
- mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass());
+ mergeWork.setSourceTableInputFormat(mergeFilesDesc.getInputFormatClass().getName());
+ final FileMergeDesc fmd;
+ if (mergeFilesDesc.getInputFormatClass().equals(RCFileInputFormat.class)) {
+ fmd = new RCFileMergeDesc();
+ } else {
+ // safe to assume else is ORC as semantic analyzer will check for RC/ORC
+ fmd = new OrcFileMergeDesc();
+ }
+
+ fmd.setDpCtx(null);
+ fmd.setHasDynamicPartitions(false);
+ fmd.setListBucketingAlterTableConcatenate(lbatc);
+ fmd.setListBucketingDepth(lbd);
+ fmd.setOutputPath(mergeFilesDesc.getOutputDir());
+
+ Operator<? extends OperatorDesc> mergeOp = OperatorFactory.get(fmd);
+
+ LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork =
+ new LinkedHashMap<String, Operator<? extends OperatorDesc>>();
+ aliasToWork.put(mergeFilesDesc.getInputDir().toString(), mergeOp);
+ mergeWork.setAliasToWork(aliasToWork);
DriverContext driverCxt = new DriverContext();
- MergeTask taskExec = new MergeTask();
+ MergeFileTask taskExec = new MergeFileTask();
taskExec.initialize(db.getConf(), null, driverCxt);
taskExec.setWork(mergeWork);
taskExec.setQueryPlan(this.getQueryPlan());
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Tue Sep 16 17:37:13 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.Context;
@@ -35,8 +36,9 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.merge.MergeTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
@@ -47,7 +49,13 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCol;
import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.plan.*;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.StringUtils;
@@ -55,7 +63,12 @@ import org.apache.hadoop.util.StringUtil
import java.io.IOException;
import java.io.Serializable;
import java.security.AccessControlException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
/**
* MoveTask implementation.
@@ -274,7 +287,8 @@ public class MoveTask extends Task<MoveW
dc = new DataContainer(table.getTTable());
db.loadTable(tbd.getSourcePath(), tbd.getTable()
.getTableName(), tbd.getReplace(), tbd.getHoldDDLTime(), work.isSrcLocal(),
- isSkewedStoredAsDirs(tbd));
+ isSkewedStoredAsDirs(tbd),
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID);
if (work.getOutputs() != null) {
work.getOutputs().add(new WriteEntity(table,
(tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
@@ -294,7 +308,7 @@ public class MoveTask extends Task<MoveW
while (task.getParentTasks() != null && task.getParentTasks().size() == 1) {
task = (Task)task.getParentTasks().get(0);
// If it was a merge task or a local map reduce task, nothing can be inferred
- if (task instanceof MergeTask || task instanceof MapredLocalTask) {
+ if (task instanceof MergeFileTask || task instanceof MapredLocalTask) {
break;
}
@@ -354,7 +368,8 @@ public class MoveTask extends Task<MoveW
tbd.getReplace(),
dpCtx.getNumDPCols(),
tbd.getHoldDDLTime(),
- isSkewedStoredAsDirs(tbd));
+ isSkewedStoredAsDirs(tbd),
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID);
if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
throw new HiveException("This query creates no partitions." +
@@ -389,7 +404,10 @@ public class MoveTask extends Task<MoveW
// update columnar lineage for each partition
dc = new DataContainer(table.getTTable(), partn.getTPartition());
- if (SessionState.get() != null) {
+ // Don't set lineage on delete as we don't have all the columns
+ if (SessionState.get() != null &&
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE &&
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) {
SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
table.getCols());
}
@@ -403,7 +421,8 @@ public class MoveTask extends Task<MoveW
db.validatePartitionNameCharacters(partVals);
db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(),
- tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal());
+ tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(),
+ work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID);
Partition partn = db.getPartition(table, tbd.getPartitionSpec(),
false);
@@ -422,8 +441,24 @@ public class MoveTask extends Task<MoveW
}
}
if (SessionState.get() != null && dc != null) {
- SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
- table.getCols());
+ // If we are doing an update or a delete the number of columns in the table will not
+ // match the number of columns in the file sink. For update there will be one too many
+ // (because of the ROW__ID), and in the case of the delete there will be just the
+ // ROW__ID, which we don't need to worry about from a lineage perspective.
+ List<FieldSchema> tableCols = null;
+ switch (work.getLoadTableWork().getWriteType()) {
+ case DELETE:
+ case UPDATE:
+ // Pass an empty list as no columns will be written to the file.
+ // TODO I should be able to make this work for update
+ tableCols = new ArrayList<FieldSchema>();
+ break;
+
+ default:
+ tableCols = table.getCols();
+ break;
+ }
+ SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, tableCols);
}
releaseLocks(tbd);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Sep 16 17:37:13 2014
@@ -18,10 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
@@ -53,7 +49,9 @@ import org.apache.hadoop.hive.ql.plan.Li
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MuxDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.OrcFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
+import org.apache.hadoop.hive.ql.plan.RCFileMergeDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.ScriptDesc;
@@ -62,6 +60,10 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.UDTFDesc;
import org.apache.hadoop.hive.ql.plan.UnionDesc;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
/**
* OperatorFactory.
*
@@ -108,6 +110,10 @@ public final class OperatorFactory {
AppMasterEventOperator.class));
opvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
AppMasterEventOperator.class));
+ opvec.add(new OpTuple<RCFileMergeDesc>(RCFileMergeDesc.class,
+ RCFileMergeOperator.class));
+ opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
+ OrcFileMergeOperator.class));
}
static {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Sep 16 17:37:13 2014
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -43,8 +44,10 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
@@ -131,10 +134,18 @@ public class ReduceSinkOperator extends
// TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
protected transient Object[][] cachedKeys;
+ private StructField recIdField; // field to look for record identifier in
+ private StructField bucketField; // field to look for bucket in record identifier
+ private StructObjectInspector acidRowInspector; // row inspector used by acid options
+ private StructObjectInspector recIdInspector; // OI for the record identifier
+ private IntObjectInspector bucketInspector; // OI for the bucket field in the record id
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
try {
List<ExprNodeDesc> keys = conf.getKeyCols();
+ LOG.debug("keys size is " + keys.size());
+ for (ExprNodeDesc k : keys) LOG.debug("Key exprNodeDesc " + k.getExprString());
keyEval = new ExprNodeEvaluator[keys.size()];
int i = 0;
for (ExprNodeDesc e : keys) {
@@ -259,6 +270,20 @@ public class ReduceSinkOperator extends
// TODO: this is fishy - we init object inspectors based on first tag. We
// should either init for each tag, or if rowInspector doesn't really
// matter, then we can create this in ctor and get rid of firstRow.
+ if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+ conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ assert rowInspector instanceof StructObjectInspector :
+ "Exptected rowInspector to be instance of StructObjectInspector but it is a " +
+ rowInspector.getClass().getName();
+ acidRowInspector = (StructObjectInspector)rowInspector;
+ // The record identifier is always in the first column
+ recIdField = acidRowInspector.getAllStructFieldRefs().get(0);
+ recIdInspector = (StructObjectInspector)recIdField.getFieldObjectInspector();
+ // The bucket field is in the second position
+ bucketField = recIdInspector.getAllStructFieldRefs().get(1);
+ bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
+ }
+
LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys());
keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
distinctColIndices,
@@ -283,6 +308,11 @@ public class ReduceSinkOperator extends
if (bucketEval != null) {
buckNum = computeBucketNumber(row, conf.getNumBuckets());
cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
+ } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+ conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ // In the non-partitioned case we still want to compute the bucket number for updates and
+ // deletes.
+ buckNum = computeBucketNumber(row, conf.getNumBuckets());
}
HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
@@ -339,9 +369,20 @@ public class ReduceSinkOperator extends
private int computeBucketNumber(Object row, int numBuckets) throws HiveException {
int buckNum = 0;
- for (int i = 0; i < bucketEval.length; i++) {
- Object o = bucketEval[i].evaluate(row);
- buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+
+ if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
+ conf.getWriteType() == AcidUtils.Operation.DELETE) {
+ // We don't need to evalute the hash code. Instead read the bucket number directly from
+ // the row. I don't need to evaluate any expressions as I know I am reading the ROW__ID
+ // column directly.
+ Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
+ buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
+ LOG.debug("Acid choosing bucket number " + buckNum);
+ } else {
+ for (int i = 0; i < bucketEval.length; i++) {
+ Object o = bucketEval[i].evaluate(row);
+ buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+ }
}
if (buckNum < 0) {
@@ -385,14 +426,19 @@ public class ReduceSinkOperator extends
// Evaluate the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide better
- // load balance. If the requirement is to have a single reducer, we should set
- // the number of reducers to 1.
- // Use a constant seed to make the code deterministic.
- if (random == null) {
- random = new Random(12345);
+ // If no partition cols and not doing an update or delete, just distribute the data uniformly
+ // to provide better load balance. If the requirement is to have a single reducer, we should
+ // set the number of reducers to 1. Use a constant seed to make the code deterministic.
+ // For acid operations make sure to send all records with the same key to the same
+ // FileSinkOperator, as the RecordUpdater interface can't manage multiple writers for a file.
+ if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID) {
+ if (random == null) {
+ random = new Random(12345);
+ }
+ keyHashCode = random.nextInt();
+ } else {
+ keyHashCode = 1;
}
- keyHashCode = random.nextInt();
} else {
for (int i = 0; i < partitionEval.length; i++) {
Object o = partitionEval[i].evaluate(row);
@@ -400,6 +446,7 @@ public class ReduceSinkOperator extends
+ ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
}
}
+ LOG.debug("Going to return hash code " + (keyHashCode * 31 + buckNum));
return buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java Tue Sep 16 17:37:13 2014
@@ -28,8 +28,8 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
-import org.apache.hadoop.hive.ql.io.merge.MergeTask;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileTask;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanTask;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
@@ -94,8 +94,8 @@ public final class TaskFactory {
taskvec.add(new TaskTuple<StatsNoJobWork>(StatsNoJobWork.class, StatsNoJobTask.class));
taskvec.add(new TaskTuple<ColumnStatsWork>(ColumnStatsWork.class, ColumnStatsTask.class));
taskvec.add(new TaskTuple<ColumnStatsUpdateWork>(ColumnStatsUpdateWork.class, ColumnStatsUpdateTask.class));
- taskvec.add(new TaskTuple<MergeWork>(MergeWork.class,
- MergeTask.class));
+ taskvec.add(new TaskTuple<MergeFileWork>(MergeFileWork.class,
+ MergeFileTask.class));
taskvec.add(new TaskTuple<DependencyCollectionWork>(DependencyCollectionWork.class,
DependencyCollectionTask.class));
taskvec.add(new TaskTuple<PartialScanWork>(PartialScanWork.class,
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Sep 16 17:37:13 2014
@@ -18,67 +18,11 @@
package org.apache.hadoop.hive.ql.exec;
-import java.beans.DefaultPersistenceDelegate;
-import java.beans.Encoder;
-import java.beans.ExceptionListener;
-import java.beans.Expression;
-import java.beans.PersistenceDelegate;
-import java.beans.Statement;
-import java.beans.XMLDecoder;
-import java.beans.XMLEncoder;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.sql.SQLTransientException;
-import java.sql.Timestamp;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.InflaterInputStream;
-
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
import org.antlr.runtime.CommonToken;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
@@ -122,9 +66,8 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
-import org.apache.hadoop.hive.ql.io.merge.MergeWork;
-import org.apache.hadoop.hive.ql.io.orc.OrcFileMergeMapper;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
@@ -181,11 +124,66 @@ import org.apache.hadoop.util.Progressab
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.esotericsoftware.shaded.org.objenesis.strategy.StdInstantiatorStrategy;
+import java.beans.DefaultPersistenceDelegate;
+import java.beans.Encoder;
+import java.beans.ExceptionListener;
+import java.beans.Expression;
+import java.beans.PersistenceDelegate;
+import java.beans.Statement;
+import java.beans.XMLDecoder;
+import java.beans.XMLEncoder;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.SQLTransientException;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
/**
* Utilities.
@@ -352,9 +350,8 @@ public final class Utilities {
if(MAP_PLAN_NAME.equals(name)){
if (ExecMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))){
gWork = deserializePlan(in, MapWork.class, conf);
- } else if(RCFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS)) ||
- OrcFileMergeMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
- gWork = deserializePlan(in, MergeWork.class, conf);
+ } else if(MergeFileMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
+ gWork = deserializePlan(in, MergeFileWork.class, conf);
} else if(ColumnTruncateMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
gWork = deserializePlan(in, ColumnTruncateWork.class, conf);
} else if(PartialScanMapper.class.getName().equals(conf.get(MAPRED_MAPPER_CLASS))) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java Tue Sep 16 17:37:13 2014
@@ -26,7 +26,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Tue Sep 16 17:37:13 2014
@@ -17,22 +17,9 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.auth.login.LoginException;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -54,6 +41,9 @@ import org.apache.hadoop.hive.ql.io.Comb
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -68,6 +58,7 @@ import org.apache.hadoop.hive.shims.Hado
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
@@ -113,9 +104,20 @@ import org.apache.tez.runtime.library.co
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
+import javax.security.auth.login.LoginException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
/**
* DagUtils. DagUtils is a collection of helper methods to convert
@@ -212,6 +214,16 @@ public class DagUtils {
conf.set("mapred.mapper.class", ExecMapper.class.getName());
conf.set("mapred.input.format.class", inpFormat);
+ if (mapWork instanceof MergeFileWork) {
+ MergeFileWork mfWork = (MergeFileWork) mapWork;
+ // This mapper class is used for serializaiton/deserializaiton of merge
+ // file work.
+ conf.set("mapred.mapper.class", MergeFileMapper.class.getName());
+ conf.set("mapred.input.format.class", mfWork.getInputformat());
+ conf.setClass("mapred.output.format.class", MergeFileOutputFormat.class,
+ FileOutputFormat.class);
+ }
+
return conf;
}
@@ -486,6 +498,21 @@ public class DagUtils {
}
}
+ if (mapWork instanceof MergeFileWork) {
+ Path outputPath = ((MergeFileWork) mapWork).getOutputDir();
+ // prepare the tmp output directory. The output tmp directory should
+ // exist before jobClose (before renaming after job completion)
+ Path tempOutPath = Utilities.toTempPath(outputPath);
+ try {
+ if (!fs.exists(tempOutPath)) {
+ fs.mkdirs(tempOutPath);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Can't make path " + outputPath + " : " + e.getMessage());
+ }
+ }
+
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
&& !mapWork.isUseOneNullRowInputFormat()) {
@@ -515,9 +542,13 @@ public class DagUtils {
}
UserPayload serializedConf = TezUtils.createUserPayloadFromConf(conf);
- map = Vertex.create(mapWork.getName(),
- ProcessorDescriptor.create(MapTezProcessor.class.getName()).
- setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+ String procClassName = MapTezProcessor.class.getName();
+ if (mapWork instanceof MergeFileWork) {
+ procClassName = MergeFileTezProcessor.class.getName();
+ }
+ map = Vertex.create(mapWork.getName(), ProcessorDescriptor.create(procClassName)
+ .setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+
map.setTaskEnvironment(getContainerEnvironment(conf, true));
map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
@@ -784,7 +815,7 @@ public class DagUtils {
}
/**
- * @param path - the path from which we try to determine the resource base name
+ * @param path - the string from which we try to determine the resource base name
* @return the name of the resource from a given path string.
*/
public String getResourceBaseName(Path path) {
@@ -831,7 +862,8 @@ public class DagUtils {
conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname,
HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal);
long sleepInterval = HiveConf.getTimeVar(
- conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
+ conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL,
+ TimeUnit.MILLISECONDS);
LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: "
+ sleepInterval);
boolean found = false;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Tue Sep 16 17:37:13 2014
@@ -16,13 +16,8 @@
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.net.URLClassLoader;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Map.Entry;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
@@ -34,8 +29,12 @@ import org.apache.tez.runtime.api.Logica
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
/**
* Process input from tez LogicalInput and write output
@@ -66,7 +65,7 @@ public abstract class RecordProcessor {
/**
* Common initialization code for RecordProcessors
* @param jconf
- * @param processorContext the {@link TezProcessorContext}
+ * @param processorContext the {@link ProcessorContext}
* @param mrReporter
* @param inputs map of Input names to {@link LogicalInput}s
* @param outputs map of Output names to {@link LogicalOutput}s
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Tue Sep 16 17:37:13 2014
@@ -17,12 +17,6 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,6 +34,11 @@ import org.apache.tez.runtime.api.Logica
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueWriter;
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.List;
+import java.util.Map;
+
/**
* Hive processor for Tez that forms the vertices in Tez and processes the data.
* Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -51,13 +50,15 @@ public class TezProcessor extends Abstra
private static final Log LOG = LogFactory.getLog(TezProcessor.class);
protected boolean isMap = false;
- RecordProcessor rproc = null;
+ protected RecordProcessor rproc = null;
- private JobConf jobConf;
+ protected JobConf jobConf;
private static final String CLASS_NAME = TezProcessor.class.getName();
private final PerfLogger perfLogger = PerfLogger.getPerfLogger();
+ protected ProcessorContext processorContext;
+
protected static final NumberFormat taskIdFormat = NumberFormat.getInstance();
protected static final NumberFormat jobIdFormat = NumberFormat.getInstance();
static {
@@ -121,9 +122,6 @@ public class TezProcessor extends Abstra
public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs)
throws Exception {
- Throwable originalThrowable = null;
-
- try{
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR);
// in case of broadcast-join read the broadcast edge inputs
// (possibly asynchronously)
@@ -142,14 +140,23 @@ public class TezProcessor extends Abstra
rproc = new ReduceRecordProcessor();
}
+ initializeAndRunProcessor(inputs, outputs);
+ }
+
+ protected void initializeAndRunProcessor(Map<String, LogicalInput> inputs,
+ Map<String, LogicalOutput> outputs)
+ throws Exception {
+ Throwable originalThrowable = null;
+ try {
TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf);
// Start the actual Inputs. After MRInput initialization.
- for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
+ for (Map.Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
if (!cacheAccess.isInputCached(inputEntry.getKey())) {
LOG.info("Input: " + inputEntry.getKey() + " is not cached");
inputEntry.getValue().start();
} else {
- LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");
+ LOG.info("Input: " + inputEntry.getKey() +
+ " is already cached. Skipping start");
}
}
@@ -170,7 +177,7 @@ public class TezProcessor extends Abstra
}
try {
- if(rproc != null){
+ if (rproc != null) {
rproc.close();
}
} catch (Throwable t) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java?rev=1625341&r1=1625340&r2=1625341&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java Tue Sep 16 17:37:13 2014
@@ -428,8 +428,12 @@ public class VectorColumnAssignFactory {
assignNull(destIndex);
}
else {
- HiveDecimalWritable hdw = (HiveDecimalWritable) val;
- assignDecimal(hdw, destIndex);
+ if (val instanceof HiveDecimal) {
+ assignDecimal((HiveDecimal) val, destIndex);
+ } else {
+ assignDecimal((HiveDecimalWritable) val, destIndex);
+ }
+
}
}
}.init(outputBatch, (DecimalColumnVector) destCol);