You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/11/15 20:21:14 UTC
[42/50] [abbrv] hive git commit: HIVE-14671 : merge master into
hive-14535 (exim conflicts merge 1) (Sergey Shelukhin)
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --cc metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index b90de43,d420f06..b2e2484
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@@ -12733,251 -12680,58 +12744,668 @@@ class GetValidWriteIdsRequest
if ftype == TType.STOP:
break
if fid == 1:
- if ftype == TType.LIST:
- self.tables = []
- (_etype583, _size580) = iprot.readListBegin()
- for _i584 in xrange(_size580):
- _elem585 = Table()
- _elem585.read(iprot)
- self.tables.append(_elem585)
+ if ftype == TType.STRING:
+ self.dbName = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.tblName = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('GetValidWriteIdsRequest')
+ if self.dbName is not None:
+ oprot.writeFieldBegin('dbName', TType.STRING, 1)
+ oprot.writeString(self.dbName)
+ oprot.writeFieldEnd()
+ if self.tblName is not None:
+ oprot.writeFieldBegin('tblName', TType.STRING, 2)
+ oprot.writeString(self.tblName)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.dbName is None:
+ raise TProtocol.TProtocolException(message='Required field dbName is unset!')
+ if self.tblName is None:
+ raise TProtocol.TProtocolException(message='Required field tblName is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.dbName)
+ value = (value * 31) ^ hash(self.tblName)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class GetValidWriteIdsResult:
+ """
+ Attributes:
+ - lowWatermarkId
+ - highWatermarkId
+ - areIdsValid
+ - ids
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'lowWatermarkId', None, None, ), # 1
+ (2, TType.I64, 'highWatermarkId', None, None, ), # 2
+ (3, TType.BOOL, 'areIdsValid', None, None, ), # 3
+ (4, TType.LIST, 'ids', (TType.I64,None), None, ), # 4
+ )
+
+ def __init__(self, lowWatermarkId=None, highWatermarkId=None, areIdsValid=None, ids=None,):
+ self.lowWatermarkId = lowWatermarkId
+ self.highWatermarkId = highWatermarkId
+ self.areIdsValid = areIdsValid
+ self.ids = ids
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.lowWatermarkId = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I64:
+ self.highWatermarkId = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.BOOL:
+ self.areIdsValid = iprot.readBool()
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.LIST:
+ self.ids = []
+ (_etype562, _size559) = iprot.readListBegin()
+ for _i563 in xrange(_size559):
+ _elem564 = iprot.readI64()
+ self.ids.append(_elem564)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('GetValidWriteIdsResult')
+ if self.lowWatermarkId is not None:
+ oprot.writeFieldBegin('lowWatermarkId', TType.I64, 1)
+ oprot.writeI64(self.lowWatermarkId)
+ oprot.writeFieldEnd()
+ if self.highWatermarkId is not None:
+ oprot.writeFieldBegin('highWatermarkId', TType.I64, 2)
+ oprot.writeI64(self.highWatermarkId)
+ oprot.writeFieldEnd()
+ if self.areIdsValid is not None:
+ oprot.writeFieldBegin('areIdsValid', TType.BOOL, 3)
+ oprot.writeBool(self.areIdsValid)
+ oprot.writeFieldEnd()
+ if self.ids is not None:
+ oprot.writeFieldBegin('ids', TType.LIST, 4)
+ oprot.writeListBegin(TType.I64, len(self.ids))
+ for iter565 in self.ids:
+ oprot.writeI64(iter565)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.lowWatermarkId is None:
+ raise TProtocol.TProtocolException(message='Required field lowWatermarkId is unset!')
+ if self.highWatermarkId is None:
+ raise TProtocol.TProtocolException(message='Required field highWatermarkId is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.lowWatermarkId)
+ value = (value * 31) ^ hash(self.highWatermarkId)
+ value = (value * 31) ^ hash(self.areIdsValid)
+ value = (value * 31) ^ hash(self.ids)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class GetAllFunctionsResponse:
+ """
+ Attributes:
+ - functions
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.LIST, 'functions', (TType.STRUCT,(Function, Function.thrift_spec)), None, ), # 1
+ )
+
+ def __init__(self, functions=None,):
+ self.functions = functions
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.LIST:
+ self.functions = []
+ (_etype569, _size566) = iprot.readListBegin()
+ for _i570 in xrange(_size566):
+ _elem571 = Function()
+ _elem571.read(iprot)
+ self.functions.append(_elem571)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('GetAllFunctionsResponse')
+ if self.functions is not None:
+ oprot.writeFieldBegin('functions', TType.LIST, 1)
+ oprot.writeListBegin(TType.STRUCT, len(self.functions))
+ for iter572 in self.functions:
+ iter572.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.functions)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
++class ClientCapabilities:
++ """
++ Attributes:
++ - values
++ """
++
++ thrift_spec = (
++ None, # 0
++ (1, TType.LIST, 'values', (TType.I32,None), None, ), # 1
++ )
++
++ def __init__(self, values=None,):
++ self.values = values
++
++ def read(self, iprot):
++ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
++ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
++ return
++ iprot.readStructBegin()
++ while True:
++ (fname, ftype, fid) = iprot.readFieldBegin()
++ if ftype == TType.STOP:
++ break
++ if fid == 1:
++ if ftype == TType.LIST:
++ self.values = []
++ (_etype576, _size573) = iprot.readListBegin()
++ for _i577 in xrange(_size573):
++ _elem578 = iprot.readI32()
++ self.values.append(_elem578)
++ iprot.readListEnd()
++ else:
++ iprot.skip(ftype)
++ else:
++ iprot.skip(ftype)
++ iprot.readFieldEnd()
++ iprot.readStructEnd()
++
++ def write(self, oprot):
++ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
++ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
++ return
++ oprot.writeStructBegin('ClientCapabilities')
++ if self.values is not None:
++ oprot.writeFieldBegin('values', TType.LIST, 1)
++ oprot.writeListBegin(TType.I32, len(self.values))
++ for iter579 in self.values:
++ oprot.writeI32(iter579)
++ oprot.writeListEnd()
++ oprot.writeFieldEnd()
++ oprot.writeFieldStop()
++ oprot.writeStructEnd()
++
++ def validate(self):
++ if self.values is None:
++ raise TProtocol.TProtocolException(message='Required field values is unset!')
++ return
++
++
++ def __hash__(self):
++ value = 17
++ value = (value * 31) ^ hash(self.values)
++ return value
++
++ def __repr__(self):
++ L = ['%s=%r' % (key, value)
++ for key, value in self.__dict__.iteritems()]
++ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
++
++ def __eq__(self, other):
++ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
++
++ def __ne__(self, other):
++ return not (self == other)
++
++class GetTableRequest:
++ """
++ Attributes:
++ - dbName
++ - tblName
++ - capabilities
++ """
++
++ thrift_spec = (
++ None, # 0
++ (1, TType.STRING, 'dbName', None, None, ), # 1
++ (2, TType.STRING, 'tblName', None, None, ), # 2
++ (3, TType.STRUCT, 'capabilities', (ClientCapabilities, ClientCapabilities.thrift_spec), None, ), # 3
++ )
++
++ def __init__(self, dbName=None, tblName=None, capabilities=None,):
++ self.dbName = dbName
++ self.tblName = tblName
++ self.capabilities = capabilities
++
++ def read(self, iprot):
++ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
++ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
++ return
++ iprot.readStructBegin()
++ while True:
++ (fname, ftype, fid) = iprot.readFieldBegin()
++ if ftype == TType.STOP:
++ break
++ if fid == 1:
++ if ftype == TType.STRING:
++ self.dbName = iprot.readString()
++ else:
++ iprot.skip(ftype)
++ elif fid == 2:
++ if ftype == TType.STRING:
++ self.tblName = iprot.readString()
++ else:
++ iprot.skip(ftype)
++ elif fid == 3:
++ if ftype == TType.STRUCT:
++ self.capabilities = ClientCapabilities()
++ self.capabilities.read(iprot)
++ else:
++ iprot.skip(ftype)
++ else:
++ iprot.skip(ftype)
++ iprot.readFieldEnd()
++ iprot.readStructEnd()
++
++ def write(self, oprot):
++ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
++ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
++ return
++ oprot.writeStructBegin('GetTableRequest')
++ if self.dbName is not None:
++ oprot.writeFieldBegin('dbName', TType.STRING, 1)
++ oprot.writeString(self.dbName)
++ oprot.writeFieldEnd()
++ if self.tblName is not None:
++ oprot.writeFieldBegin('tblName', TType.STRING, 2)
++ oprot.writeString(self.tblName)
++ oprot.writeFieldEnd()
++ if self.capabilities is not None:
++ oprot.writeFieldBegin('capabilities', TType.STRUCT, 3)
++ self.capabilities.write(oprot)
++ oprot.writeFieldEnd()
++ oprot.writeFieldStop()
++ oprot.writeStructEnd()
++
++ def validate(self):
++ if self.dbName is None:
++ raise TProtocol.TProtocolException(message='Required field dbName is unset!')
++ if self.tblName is None:
++ raise TProtocol.TProtocolException(message='Required field tblName is unset!')
++ return
++
++
++ def __hash__(self):
++ value = 17
++ value = (value * 31) ^ hash(self.dbName)
++ value = (value * 31) ^ hash(self.tblName)
++ value = (value * 31) ^ hash(self.capabilities)
++ return value
++
++ def __repr__(self):
++ L = ['%s=%r' % (key, value)
++ for key, value in self.__dict__.iteritems()]
++ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
++
++ def __eq__(self, other):
++ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
++
++ def __ne__(self, other):
++ return not (self == other)
++
++class GetTableResult:
++ """
++ Attributes:
++ - table
++ """
++
++ thrift_spec = (
++ None, # 0
++ (1, TType.STRUCT, 'table', (Table, Table.thrift_spec), None, ), # 1
++ )
++
++ def __init__(self, table=None,):
++ self.table = table
++
++ def read(self, iprot):
++ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
++ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
++ return
++ iprot.readStructBegin()
++ while True:
++ (fname, ftype, fid) = iprot.readFieldBegin()
++ if ftype == TType.STOP:
++ break
++ if fid == 1:
++ if ftype == TType.STRUCT:
++ self.table = Table()
++ self.table.read(iprot)
++ else:
++ iprot.skip(ftype)
++ else:
++ iprot.skip(ftype)
++ iprot.readFieldEnd()
++ iprot.readStructEnd()
++
++ def write(self, oprot):
++ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
++ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
++ return
++ oprot.writeStructBegin('GetTableResult')
++ if self.table is not None:
++ oprot.writeFieldBegin('table', TType.STRUCT, 1)
++ self.table.write(oprot)
++ oprot.writeFieldEnd()
++ oprot.writeFieldStop()
++ oprot.writeStructEnd()
++
++ def validate(self):
++ if self.table is None:
++ raise TProtocol.TProtocolException(message='Required field table is unset!')
++ return
++
++
++ def __hash__(self):
++ value = 17
++ value = (value * 31) ^ hash(self.table)
++ return value
++
++ def __repr__(self):
++ L = ['%s=%r' % (key, value)
++ for key, value in self.__dict__.iteritems()]
++ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
++
++ def __eq__(self, other):
++ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
++
++ def __ne__(self, other):
++ return not (self == other)
++
++class GetTablesRequest:
++ """
++ Attributes:
++ - dbName
++ - tblNames
++ - capabilities
++ """
++
++ thrift_spec = (
++ None, # 0
++ (1, TType.STRING, 'dbName', None, None, ), # 1
++ (2, TType.LIST, 'tblNames', (TType.STRING,None), None, ), # 2
++ (3, TType.STRUCT, 'capabilities', (ClientCapabilities, ClientCapabilities.thrift_spec), None, ), # 3
++ )
++
++ def __init__(self, dbName=None, tblNames=None, capabilities=None,):
++ self.dbName = dbName
++ self.tblNames = tblNames
++ self.capabilities = capabilities
++
++ def read(self, iprot):
++ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
++ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
++ return
++ iprot.readStructBegin()
++ while True:
++ (fname, ftype, fid) = iprot.readFieldBegin()
++ if ftype == TType.STOP:
++ break
++ if fid == 1:
++ if ftype == TType.STRING:
++ self.dbName = iprot.readString()
++ else:
++ iprot.skip(ftype)
++ elif fid == 2:
++ if ftype == TType.LIST:
++ self.tblNames = []
++ (_etype583, _size580) = iprot.readListBegin()
++ for _i584 in xrange(_size580):
++ _elem585 = iprot.readString()
++ self.tblNames.append(_elem585)
++ iprot.readListEnd()
++ else:
++ iprot.skip(ftype)
++ elif fid == 3:
++ if ftype == TType.STRUCT:
++ self.capabilities = ClientCapabilities()
++ self.capabilities.read(iprot)
++ else:
++ iprot.skip(ftype)
++ else:
++ iprot.skip(ftype)
++ iprot.readFieldEnd()
++ iprot.readStructEnd()
++
++ def write(self, oprot):
++ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
++ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
++ return
++ oprot.writeStructBegin('GetTablesRequest')
++ if self.dbName is not None:
++ oprot.writeFieldBegin('dbName', TType.STRING, 1)
++ oprot.writeString(self.dbName)
++ oprot.writeFieldEnd()
++ if self.tblNames is not None:
++ oprot.writeFieldBegin('tblNames', TType.LIST, 2)
++ oprot.writeListBegin(TType.STRING, len(self.tblNames))
++ for iter586 in self.tblNames:
++ oprot.writeString(iter586)
++ oprot.writeListEnd()
++ oprot.writeFieldEnd()
++ if self.capabilities is not None:
++ oprot.writeFieldBegin('capabilities', TType.STRUCT, 3)
++ self.capabilities.write(oprot)
++ oprot.writeFieldEnd()
++ oprot.writeFieldStop()
++ oprot.writeStructEnd()
++
++ def validate(self):
++ if self.dbName is None:
++ raise TProtocol.TProtocolException(message='Required field dbName is unset!')
++ return
++
++
++ def __hash__(self):
++ value = 17
++ value = (value * 31) ^ hash(self.dbName)
++ value = (value * 31) ^ hash(self.tblNames)
++ value = (value * 31) ^ hash(self.capabilities)
++ return value
++
++ def __repr__(self):
++ L = ['%s=%r' % (key, value)
++ for key, value in self.__dict__.iteritems()]
++ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
++
++ def __eq__(self, other):
++ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
++
++ def __ne__(self, other):
++ return not (self == other)
++
++class GetTablesResult:
++ """
++ Attributes:
++ - tables
++ """
++
++ thrift_spec = (
++ None, # 0
++ (1, TType.LIST, 'tables', (TType.STRUCT,(Table, Table.thrift_spec)), None, ), # 1
++ )
++
++ def __init__(self, tables=None,):
++ self.tables = tables
++
++ def read(self, iprot):
++ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
++ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
++ return
++ iprot.readStructBegin()
++ while True:
++ (fname, ftype, fid) = iprot.readFieldBegin()
++ if ftype == TType.STOP:
++ break
++ if fid == 1:
++ if ftype == TType.LIST:
++ self.tables = []
++ (_etype590, _size587) = iprot.readListBegin()
++ for _i591 in xrange(_size587):
++ _elem592 = Table()
++ _elem592.read(iprot)
++ self.tables.append(_elem592)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('GetTablesResult')
+ if self.tables is not None:
+ oprot.writeFieldBegin('tables', TType.LIST, 1)
+ oprot.writeListBegin(TType.STRUCT, len(self.tables))
- for iter586 in self.tables:
- iter586.write(oprot)
++ for iter593 in self.tables:
++ iter593.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.tables is None:
+ raise TProtocol.TProtocolException(message='Required field tables is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.tables)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class TableMeta:
"""
Attributes:
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 2801192,c0ef25e..3b0f368
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@@ -119,13 -236,9 +126,10 @@@ import org.slf4j.Logger
import org.slf4j.LoggerFactory;
import javax.jdo.JDOException;
-
import java.io.IOException;
- import java.net.InetAddress;
- import java.net.InetSocketAddress;
- import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.sql.SQLException;
+ import java.security.PrivilegedExceptionAction;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.AbstractMap;
@@@ -2272,18 -2531,34 +2378,34 @@@ public class HiveMetaStore extends Thri
continue;
}
+ final UserGroupInformation ugi;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
- partFutures.add(threadPool.submit(new Callable() {
+ partFutures.add(threadPool.submit(new Callable<Partition>() {
@Override
public Partition call() throws Exception {
- boolean madeDir = createLocationForAddedPartition(table, part);
- if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) {
- // Technically, for ifNotExists case, we could insert one and discard the other
- // because the first one now "exists", but it seems better to report the problem
- // upstream as such a command doesn't make sense.
- throw new MetaException("Duplicate partitions in the list: " + part);
- }
- initializeAddedPartition(table, part, madeDir);
+ ugi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ try {
+ boolean madeDir = createLocationForAddedPartition(table, part);
+ if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) {
+ // Technically, for ifNotExists case, we could insert one and discard the other
+ // because the first one now "exists", but it seems better to report the problem
+ // upstream as such a command doesn't make sense.
+ throw new MetaException("Duplicate partitions in the list: " + part);
+ }
+ initializeAddedPartition(table, part, madeDir);
+ } catch (MetaException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ return null;
+ }
+ });
return part;
}
}));
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index b8b81ed,2f59054..f698125
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@@ -28,8 -30,8 +30,9 @@@ import org.apache.hadoop.hive.common.cl
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.conf.HiveConfUtil;
+import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.TableType;
+ import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 0000000,4c0f817..46c817a
mode 000000,100644..100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@@ -1,0 -1,250 +1,251 @@@
+ /**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.hadoop.hive.ql.exec;
+
+ import org.apache.hadoop.fs.FSDataInputStream;
+ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+ import org.apache.hadoop.hive.ql.plan.CopyWork;
+ import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
+
+ import java.io.BufferedReader;
+ import java.io.BufferedWriter;
+ import java.io.IOException;
+ import java.io.InputStreamReader;
+ import java.io.OutputStream;
+ import java.io.OutputStreamWriter;
+ import java.io.Serializable;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.List;
+
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.common.FileUtils;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.ql.DriverContext;
+ import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
+ import org.apache.hadoop.hive.ql.plan.api.StageType;
+ import org.apache.hadoop.util.StringUtils;
+
+ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
+
+
+ private static final long serialVersionUID = 1L;
+
+ private static transient final Logger LOG = LoggerFactory.getLogger(ReplCopyTask.class);
+
+ public ReplCopyTask(){
+ super();
+ }
+
+ @Override
+ protected int execute(DriverContext driverContext) {
+ LOG.debug("ReplCopyTask.execute()");
+ FileSystem dstFs = null;
+ Path toPath = null;
+ try {
- Path fromPath = work.getFromPath();
- toPath = work.getToPath();
++ // TODO# merge with real CopyTask logic
++ Path fromPath = work.getFromPaths()[0];
++ toPath = work.getToPaths()[0];
+
+ console.printInfo("Copying data from " + fromPath.toString(), " to "
+ + toPath.toString());
+
+ ReplCopyWork rwork = ((ReplCopyWork)work);
+
+ FileSystem srcFs = fromPath.getFileSystem(conf);
+ dstFs = toPath.getFileSystem(conf);
+
+ List<FileStatus> srcFiles = new ArrayList<FileStatus>();
+ FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
+ LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length));
+ if (! rwork.getReadListFromInput()){
+ if (srcs == null || srcs.length == 0) {
+ if (work.isErrorOnSrcEmpty()) {
+ console.printError("No files matching path: " + fromPath.toString());
+ return 3;
+ } else {
+ return 0;
+ }
+ }
+ } else {
+ LOG.debug("ReplCopyTask making sense of _files");
+ // Our input is probably the result of a _files listing, we should expand out _files.
+ srcFiles = filesInFileListing(srcFs,fromPath);
+ LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" : srcFiles.size()));
+ if (srcFiles == null){
+ if (work.isErrorOnSrcEmpty()) {
+ console.printError("No _files entry found on source: " + fromPath.toString());
+ return 5;
+ } else {
+ return 0;
+ }
+ }
+ }
+ // Add in all the lone filecopies expected as well - applies to
+ // both _files case stragglers and regular copies
+ srcFiles.addAll(Arrays.asList(srcs));
+ LOG.debug("ReplCopyTask numFiles:" + (srcFiles == null ? "null" : srcFiles.size()));
+
+ boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+ if (!FileUtils.mkdir(dstFs, toPath, inheritPerms, conf)) {
+ console.printError("Cannot make target directory: " + toPath.toString());
+ return 2;
+ }
+
+ BufferedWriter listBW = null;
+ if (rwork.getListFilesOnOutputBehaviour()){
+ Path listPath = new Path(toPath,"_files");
+ LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString());
+ if (dstFs.exists(listPath)){
+ console.printError("Cannot make target _files file:" + listPath.toString());
+ return 4;
+ }
+ listBW = new BufferedWriter(new OutputStreamWriter(dstFs.create(listPath)));
+ // TODO : verify that not specifying charset here does not bite us
+ // later(for cases where filenames have unicode chars)
+ }
+
+ for (FileStatus oneSrc : srcFiles) {
+ console.printInfo("Copying file: " + oneSrc.getPath().toString());
+ LOG.debug("Copying file: " + oneSrc.getPath().toString());
+ if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){
+ FileSystem actualSrcFs = null;
+ if (rwork.getReadListFromInput()){
+ // TODO : filesystemcache prevents this from being a perf nightmare, but we
+ // should still probably follow up to see if we need to do something better here.
+ actualSrcFs = oneSrc.getPath().getFileSystem(conf);
+ } else {
+ actualSrcFs = srcFs;
+ }
+
+ LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath);
+ if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath,
+ false, // delete source
+ true, // overwrite destination
+ conf)) {
+ console.printError("Failed to copy: '" + oneSrc.getPath().toString()
+ + "to: '" + toPath.toString() + "'");
+ return 1;
+ }
+ }else{
+ LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri());
+ console.printInfo("Tracking file: " + oneSrc.getPath().toUri());
+ listBW.write(oneSrc.getPath().toUri().toString() + "\n");
+ }
+ }
+
+ if (listBW != null){
+ listBW.close();
+ }
+
+ return 0;
+
+ } catch (Exception e) {
+ console.printError("Failed with exception " + e.getMessage(), "\n"
+ + StringUtils.stringifyException(e));
+ return (1);
+ }
+ }
+
+
+ private List<FileStatus> filesInFileListing(FileSystem fs, Path path)
+ throws IOException {
+ Path fileListing = new Path(path, "_files");
+ LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri());
+ if (! fs.exists(fileListing)){
+ LOG.debug("ReplCopyTask : _files does not exist");
+ return null; // Returning null from this fn can serve as an err condition.
+ // On success, but with nothing to return, we can return an empty list.
+ }
+
+ List<FileStatus> ret = new ArrayList<FileStatus>();
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing)));
+ // TODO : verify if skipping charset here is okay
+
+ String line = null;
+ while ( (line = br.readLine()) != null){
+ LOG.debug("ReplCopyTask :_filesReadLine:" + line);
+ Path p = new Path(line);
+ FileSystem srcFs = p.getFileSystem(conf); // TODO : again, fs cache should make this okay, but if not, revisit
+ ret.add(srcFs.getFileStatus(p));
+ // Note - we need srcFs rather than fs, because it is possible that the _files lists files
+ // which are from a different filesystem than the fs where the _files file itself was loaded
+ // from. Currently, it is possible, for eg., to do REPL LOAD hdfs://<ip>/dir/ and for the _files
+ // in it to contain hdfs://<name>/ entries, and/or vice-versa, and this causes errors.
+ // It might also be possible that there will be a mix of them in a given _files file.
+ // TODO: revisit close to the end of replv2 dev, to see if our assumption now still holds,
+ // and if not so, optimize.
+ }
+
+ return ret;
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.COPY;
+ // there's no extensive need for this to have its own type - it mirrors
+ // the intent of copy enough. This might change later, though.
+ }
+
+ @Override
+ public String getName() {
+ return "REPL_COPY";
+ }
+
+ public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) {
+ Task<?> copyTask = null;
+ LOG.debug("ReplCopyTask:getLoadCopyTask: "+srcPath + "=>" + dstPath);
+ if (replicationSpec.isInReplicationScope()){
+ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false);
+ LOG.debug("ReplCopyTask:\trcwork");
+ if (replicationSpec.isLazy()){
+ LOG.debug("ReplCopyTask:\tlazy");
+ rcwork.setReadListFromInput(true);
+ }
+ copyTask = TaskFactory.get(rcwork, conf);
+ } else {
+ LOG.debug("ReplCopyTask:\tcwork");
+ copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
+ }
+ return copyTask;
+ }
+
+ public static Task<?> getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) {
+ Task<?> copyTask = null;
+ LOG.debug("ReplCopyTask:getDumpCopyTask: "+srcPath + "=>" + dstPath);
+ if (replicationSpec.isInReplicationScope()){
+ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false);
+ LOG.debug("ReplCopyTask:\trcwork");
+ if (replicationSpec.isLazy()){
+ LOG.debug("ReplCopyTask:\tlazy");
+ rcwork.setListFilesOnOutputBehaviour(true);
+ }
+ copyTask = TaskFactory.get(rcwork, conf);
+ } else {
+ LOG.debug("ReplCopyTask:\tcwork");
+ copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
+ }
+ return copyTask;
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 40d055a,867e445..7e1e806
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@@ -318,9 -318,10 +318,10 @@@ public class DbTxnManager extends HiveT
// This is a file or something we don't hold locks for.
continue;
}
- if(t != null && AcidUtils.isAcidTable(t)) {
+ if(t != null && AcidUtils.isFullAcidTable(t)) {
compBuilder.setIsAcid(true);
}
+ compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
rqstBuilder.addLockComponent(comp);
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index 12dea9c,f61274b..b46f615
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@@ -32,23 -22,28 +32,31 @@@ import java.io.FileNotFoundException
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
+ import java.util.HashSet;
+ import java.util.List;
import org.antlr.runtime.tree.Tree;
++import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
+ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.CopyWork;
+ import org.slf4j.Logger;
/**
* ExportSemanticAnalyzer.
@@@ -197,53 -195,22 +218,75 @@@ public class ExportSemanticAnalyzer ext
for (Partition partition : partitions) {
Path fromPath = partition.getDataLocation();
Path toPartPath = new Path(parentPath, partition.getName());
- CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toPartPath);
- rootTasks.add(TaskFactory.get(cw, conf));
- Task<? extends Serializable> rTask =
- ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toPartPath, conf);
- rootTasks.add(rTask);
++ Task<?> copyTask = null;
++ if (replicationSpec.isInReplicationScope()) {
++ if (isMmTable) {
++ // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
++ throw new SemanticException(
++ "Not supported right now because Replication is completely screwed");
++ }
++ copyTask = ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toPartPath, conf);
++ } else {
++ CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toPartPath, conf);
++ copyTask = TaskFactory.get(cw, conf);
++ }
++ rootTasks.add(copyTask);
inputs.add(new ReadEntity(partition));
}
} else {
Path fromPath = ts.tableHandle.getDataLocation();
Path toDataPath = new Path(parentPath, "data");
- CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toDataPath);
- rootTasks.add(TaskFactory.get(cw, conf));
- Task<? extends Serializable> rTask =
- ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
- rootTasks.add(rTask);
++ Task<?> copyTask = null;
++ if (replicationSpec.isInReplicationScope()) {
++ if (isMmTable) {
++ // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
++ throw new SemanticException(
++ "Not supported right now because Replication is completely screwed");
++ }
++ copyTask = ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
++ } else {
++ CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toDataPath, conf);
++ copyTask = TaskFactory.get(cw, conf);
++ }
++ rootTasks.add(copyTask);
inputs.add(new ReadEntity(ts.tableHandle));
}
- outputs.add(toWriteEntity(parentPath));
- outputs.add(toWriteEntity(parentPath,conf));
++ outputs.add(toWriteEntity(parentPath, conf));
+ } catch (HiveException | IOException ex) {
+ throw new SemanticException(ex);
}
}
- private CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidWriteIds ids,
- Path fromPath, Path toDataPath) throws IOException {
++ private static CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidWriteIds ids,
++ Path fromPath, Path toDataPath, Configuration conf) throws IOException {
+ List<Path> validPaths = null;
+ if (isMmTable) {
+ fromPath = fromPath.getFileSystem(conf).makeQualified(fromPath);
+ validPaths = Utilities.getValidMmDirectoriesFromTableOrPart(fromPath, conf, ids, lbLevels);
+ }
+ if (validPaths == null) {
+ return new CopyWork(fromPath, toDataPath, false); // Not MM, or no need to skip anything.
+ } else {
+ return createCopyWorkForValidPaths(fromPath, toDataPath, validPaths);
+ }
+ }
- private CopyWork createCopyWorkForValidPaths(
++ private static CopyWork createCopyWorkForValidPaths(
+ Path fromPath, Path toPartPath, List<Path> validPaths) {
+ Path[] from = new Path[validPaths.size()], to = new Path[validPaths.size()];
+ int i = 0;
+ String fromPathStr = fromPath.toString();
+ if (!fromPathStr.endsWith(Path.SEPARATOR)) {
+ fromPathStr += "/";
+ }
+ for (Path validPath : validPaths) {
+ from[i] = validPath;
+ // TODO: assumes the results are already qualified.
+ to[i] = new Path(toPartPath, validPath.toString().substring(fromPathStr.length()));
+ Utilities.LOG14535.info("Will copy " + from[i] + " to " + to[i]
+ + " based on dest " + toPartPath + ", from " + fromPathStr + ", subpath " + validPath);
+ ++i;
+ }
+ return new CopyWork(from, to);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 8aa076f,3420efd..bceef45
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@@ -33,6 -33,6 +33,7 @@@ import java.util.TreeMap
import org.antlr.runtime.tree.Tree;
import org.apache.commons.lang.ObjectUtils;
++import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@@ -50,7 -49,7 +51,8 @@@ import org.apache.hadoop.hive.metastore
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.ImportCommitWork;
+ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@@ -119,12 -121,8 +125,9 @@@ public class ImportSemanticAnalyzer ext
Map.Entry<String,String> dbTablePair = getDbTableNamePair(tableNameNode);
parsedDbName = dbTablePair.getKey();
parsedTableName = dbTablePair.getValue();
- if (parsedDbName != null){
- isDbNameSet = true;
- }
// get partition metadata if partition specified
if (child.getChildCount() == 2) {
+ @SuppressWarnings("unused") // TODO: wtf?
ASTNode partspec = (ASTNode) child.getChild(1);
isPartSpecSet = true;
parsePartitionSpec(child, parsedPartSpec);
@@@ -134,126 -132,11 +137,10 @@@
}
// parsing statement is now done, on to logic.
-
- // initialize load path
- URI fromURI = EximUtil.getValidatedURI(conf, stripQuotes(fromTree.getText()));
- FileSystem fs = FileSystem.get(fromURI, conf);
- Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
- inputs.add(toReadEntity(fromPath));
-
- EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
- try {
- rv = EximUtil.readMetaData(fs, new Path(fromPath, METADATA_NAME));
- } catch (IOException e) {
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
- }
-
- ReplicationSpec replicationSpec = rv.getReplicationSpec();
- if (replicationSpec.isNoop()){
- // nothing to do here, silently return.
- return;
- }
-
- String dbname = SessionState.get().getCurrentDatabase();
- if (isDbNameSet){
- // If the parsed statement contained a db.tablename specification, prefer that.
- dbname = parsedDbName;
- }
-
- // Create table associated with the import
- // Executed if relevant, and used to contain all the other details about the table if not.
- CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname, rv.getTable());
- boolean isSourceMm = MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps());
-
- if (isExternalSet) {
- if (isSourceMm) {
- throw new SemanticException("Cannot import an MM table as external");
- }
- tblDesc.setExternal(isExternalSet);
- // This condition-check could have been avoided, but to honour the old
- // default of not calling if it wasn't set, we retain that behaviour.
- // TODO:cleanup after verification that the outer if isn't really needed here
- }
-
- if (isLocationSet){
- tblDesc.setLocation(parsedLocation);
- inputs.add(toReadEntity(parsedLocation));
- }
-
- if (isTableSet){
- tblDesc.setTableName(parsedTableName);
- }
-
- List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
- Iterable<Partition> partitions = rv.getPartitions();
- for (Partition partition : partitions) {
- // TODO: this should ideally not create AddPartitionDesc per partition
- AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
- partitionDescs.add(partsDesc);
- }
-
- if (isPartSpecSet){
- // The import specification asked for only a particular partition to be loaded
- // We load only that, and ignore all the others.
- boolean found = false;
- for (Iterator<AddPartitionDesc> partnIter = partitionDescs
- .listIterator(); partnIter.hasNext();) {
- AddPartitionDesc addPartitionDesc = partnIter.next();
- if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) {
- found = true;
- } else {
- partnIter.remove();
- }
- }
- if (!found) {
- throw new SemanticException(
- ErrorMsg.INVALID_PARTITION
- .getMsg(" - Specified partition not found in import directory"));
- }
- }
-
- if (tblDesc.getTableName() == null) {
- // Either we got the tablename from the IMPORT statement (first priority)
- // or from the export dump.
- throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg());
- } else {
- conf.set("import.destination.table", tblDesc.getTableName());
- for (AddPartitionDesc addPartitionDesc : partitionDescs) {
- addPartitionDesc.setTableName(tblDesc.getTableName());
- }
- }
-
- Warehouse wh = new Warehouse(conf);
- Table table = tableIfExists(tblDesc);
-
- if (table != null){
- checkTable(table, tblDesc, replicationSpec);
- LOG.debug("table " + tblDesc.getTableName() + " exists: metadata checked");
- tableExists = true;
- }
-
- Long mmWriteId = null;
- if (table != null && MetaStoreUtils.isInsertOnlyTable(table.getParameters())) {
- mmWriteId = db.getNextTableWriteId(table.getDbName(), table.getTableName());
- } else if (table == null && isSourceMm) {
- // We could import everything as is - directories and IDs, but that won't work with ACID
- // txn ids in future. So, let's import everything into the new MM directory with ID == 0.
- mmWriteId = 0l;
- }
- if (mmWriteId != null) {
- tblDesc.setInitialMmWriteId(mmWriteId);
- }
- if (!replicationSpec.isInReplicationScope()){
- createRegularImportTasks(
- rootTasks, tblDesc, partitionDescs,
- isPartSpecSet, replicationSpec, table,
- fromURI, fs, wh, mmWriteId, isSourceMm);
- } else {
- createReplImportTasks(
- rootTasks, tblDesc, partitionDescs,
- isPartSpecSet, replicationSpec, table,
- fromURI, fs, wh, mmWriteId, isSourceMm);
- }
+ tableExists = prepareImport(
+ isLocationSet, isExternalSet, isPartSpecSet, waitOnCreateDb,
+ parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(),
+ new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx));
-
} catch (SemanticException e) {
throw e;
} catch (Exception e) {
@@@ -284,7 -167,123 +171,138 @@@
}
}
- private AddPartitionDesc getBaseAddPartitionDescFromPartition(
+ public static boolean prepareImport(
+ boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnCreateDb,
+ String parsedLocation, String parsedTableName, String parsedDbName,
+ LinkedHashMap<String, String> parsedPartSpec,
- String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x
- ) throws IOException, MetaException, HiveException, URISyntaxException {
++ String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x)
++ throws IOException, MetaException, HiveException, URISyntaxException {
+
+ // initialize load path
+ URI fromURI = EximUtil.getValidatedURI(x.getConf(), stripQuotes(fromLocn));
+ Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+
+ FileSystem fs = FileSystem.get(fromURI, x.getConf());
+ x.getInputs().add(toReadEntity(fromPath, x.getConf()));
+
+ EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+ try {
+ rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+ }
+
+ ReplicationSpec replicationSpec = rv.getReplicationSpec();
+ if (replicationSpec.isNoop()){
+ // nothing to do here, silently return.
+ return false;
+ }
+
+ String dbname = SessionState.get().getCurrentDatabase();
+ if ((parsedDbName !=null) && (!parsedDbName.isEmpty())){
+ // If the parsed statement contained a db.tablename specification, prefer that.
+ dbname = parsedDbName;
+ }
+
+ // Create table associated with the import
+ // Executed if relevant, and used to contain all the other details about the table if not.
+ CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable());
++ boolean isSourceMm = MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps());
+
+ if (isExternalSet){
++ if (isSourceMm) {
++ throw new SemanticException("Cannot import an MM table as external");
++ }
+ tblDesc.setExternal(isExternalSet);
+ // This condition-check could have been avoided, but to honour the old
+ // default of not calling if it wasn't set, we retain that behaviour.
+ // TODO:cleanup after verification that the outer if isn't really needed here
+ }
+
+ if (isLocationSet){
+ tblDesc.setLocation(parsedLocation);
+ x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf()));
+ }
+
+ if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){
+ tblDesc.setTableName(parsedTableName);
+ }
+
+ List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
+ Iterable<Partition> partitions = rv.getPartitions();
+ for (Partition partition : partitions) {
+ // TODO: this should ideally not create AddPartitionDesc per partition
+ AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
+ partitionDescs.add(partsDesc);
+ }
+
+ if (isPartSpecSet){
+ // The import specification asked for only a particular partition to be loaded
+ // We load only that, and ignore all the others.
+ boolean found = false;
+ for (Iterator<AddPartitionDesc> partnIter = partitionDescs
+ .listIterator(); partnIter.hasNext();) {
+ AddPartitionDesc addPartitionDesc = partnIter.next();
+ if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) {
+ found = true;
+ } else {
+ partnIter.remove();
+ }
+ }
+ if (!found) {
+ throw new SemanticException(
+ ErrorMsg.INVALID_PARTITION
+ .getMsg(" - Specified partition not found in import directory"));
+ }
+ }
+
+ if (tblDesc.getTableName() == null) {
+ // Either we got the tablename from the IMPORT statement (first priority)
+ // or from the export dump.
+ throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg());
+ } else {
+ x.getConf().set("import.destination.table", tblDesc.getTableName());
+ for (AddPartitionDesc addPartitionDesc : partitionDescs) {
+ addPartitionDesc.setTableName(tblDesc.getTableName());
+ }
+ }
+
+ Warehouse wh = new Warehouse(x.getConf());
+ Table table = tableIfExists(tblDesc, x.getHive());
+ boolean tableExists = false;
+
+ if (table != null){
+ checkTable(table, tblDesc,replicationSpec, x.getConf());
+ x.getLOG().debug("table " + tblDesc.getTableName() + " exists: metadata checked");
+ tableExists = true;
+ }
+
- if (!replicationSpec.isInReplicationScope()){
++ Long mmWriteId = null;
++ if (table != null && MetaStoreUtils.isInsertOnlyTable(table.getParameters())) {
++ mmWriteId = x.getHive().getNextTableWriteId(table.getDbName(), table.getTableName());
++ } else if (table == null && isSourceMm) {
++ // We could import everything as is - directories and IDs, but that won't work with ACID
++ // txn ids in future. So, let's import everything into the new MM directory with ID == 0.
++ mmWriteId = 0l;
++ }
++ if (mmWriteId != null) {
++ tblDesc.setInitialMmWriteId(mmWriteId);
++ }
++ if (!replicationSpec.isInReplicationScope()) {
+ createRegularImportTasks(
+ tblDesc, partitionDescs,
+ isPartSpecSet, replicationSpec, table,
- fromURI, fs, wh, x);
++ fromURI, fs, wh, x, mmWriteId, isSourceMm);
+ } else {
+ createReplImportTasks(
+ tblDesc, partitionDescs,
+ isPartSpecSet, replicationSpec, waitOnCreateDb, table,
- fromURI, fs, wh, x);
++ fromURI, fs, wh, x, mmWriteId, isSourceMm);
+ }
+ return tableExists;
+ }
+
+ private static AddPartitionDesc getBaseAddPartitionDescFromPartition(
Path fromPath, String dbname, CreateTableDesc tblDesc, Partition partition) throws MetaException {
AddPartitionDesc partsDesc = new AddPartitionDesc(dbname, tblDesc.getTableName(),
EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
@@@ -337,98 -336,95 +355,140 @@@
return tblDesc;
}
- private Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
++
+ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
- ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) {
++ ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
+ Long mmWriteId, boolean isSourceMm) {
Path dataPath = new Path(fromURI.toString(), "data");
- Path destPath = mmWriteId == null ? ctx.getExternalTmpPath(tgtPath)
- Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath);
- Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf());
- LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
- Utilities.getTableDesc(table), new TreeMap<String, String>(),
- replace);
- Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(),
- x.getOutputs(), loadTableWork, null, false), x.getConf());
++ Path destPath = mmWriteId == null ? x.getCtx().getExternalTmpPath(tgtPath)
+ : new Path(tgtPath, ValidWriteIds.getMmFilePrefix(mmWriteId));
+ Utilities.LOG14535.info("adding import work for table with source location: "
+ + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm "
+ + mmWriteId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName()));
+
- CopyWork cv = new CopyWork(dataPath, destPath, false);
- cv.setSkipSourceMmDirs(isSourceMm);
++ Task<?> copyTask = null;
++ if (replicationSpec.isInReplicationScope()) {
++ if (isSourceMm || mmWriteId != null) {
++ // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
++ throw new RuntimeException(
++ "Not supported right now because Replication is completely screwed");
++ }
++ ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf());
++ } else {
++ CopyWork cw = new CopyWork(dataPath, destPath, false);
++ cw.setSkipSourceMmDirs(isSourceMm);
++ copyTask = TaskFactory.get(cw, x.getConf());
++ }
++
+ LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
+ Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, mmWriteId);
- MoveWork mv = new MoveWork(getInputs(), getOutputs(), loadTableWork, null, false);
- @SuppressWarnings("unchecked")
- Task<?> loadTableTask = TaskFactory.get(mv, conf), copyTask = TaskFactory.get(cv, conf);
++ MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false);
++ Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
copyTask.addDependentTask(loadTableTask);
- rootTasks.add(copyTask);
+ x.getTasks().add(copyTask);
return loadTableTask;
}
- @SuppressWarnings("unchecked")
- private Task<?> createTableTask(CreateTableDesc tableDesc){
- return TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tableDesc), conf);
+ private static Task<?> createTableTask(CreateTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){
+ return TaskFactory.get(new DDLWork(
+ x.getInputs(),
+ x.getOutputs(),
+ tableDesc
+ ), x.getConf());
}
- @SuppressWarnings("unchecked")
- private Task<?> dropTableTask(Table table){
- return TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
- new DropTableDesc(table.getTableName(), null, true, true, null)), conf);
+ private static Task<?> dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x){
+ return TaskFactory.get(new DDLWork(
+ x.getInputs(),
+ x.getOutputs(),
+ new DropTableDesc(table.getTableName(), null, true, true, null)
+ ), x.getConf());
}
- @SuppressWarnings("unchecked")
- private Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc) {
+ private static Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc,
+ EximUtil.SemanticAnalyzerWrapperContext x) {
tableDesc.setReplaceMode(true);
- return TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tableDesc), conf);
+ return TaskFactory.get(new DDLWork(
+ x.getInputs(),
+ x.getOutputs(),
+ tableDesc
+ ), x.getConf());
}
- private Task<? extends Serializable> alterSinglePartition(
+ private static Task<? extends Serializable> alterSinglePartition(
URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
- ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn) {
+ ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
+ EximUtil.SemanticAnalyzerWrapperContext x) {
addPartitionDesc.setReplaceMode(true);
addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location
- @SuppressWarnings("unchecked")
- Task<?> r = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc), conf);
- return r;
+ return TaskFactory.get(new DDLWork(
+ x.getInputs(),
+ x.getOutputs(),
+ addPartitionDesc
+ ), x.getConf());
}
-
- private Task<?> addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
- Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
- ReplicationSpec replicationSpec, Long mmWriteId, boolean isSourceMm, Task<?> commitTask)
+ private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
- Table table, Warehouse wh,
- AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x)
++ Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
++ EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm,
++ Task<?> commitTask)
throws MetaException, IOException, HiveException {
AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
- LOG.debug("Importing in-place: adding AddPart for partition "
+ x.getLOG().debug("Importing in-place: adding AddPart for partition "
+ partSpecToString(partSpec.getPartSpec()));
// addPartitionDesc already has the right partition location
+ @SuppressWarnings("unchecked")
- Task<?> addPartTask = TaskFactory.get(new DDLWork(getInputs(),
- getOutputs(), addPartitionDesc), conf);
+ Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
+ x.getOutputs(), addPartitionDesc), x.getConf());
return addPartTask;
} else {
String srcLocation = partSpec.getLocation();
- fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec);
+ fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x);
+ x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+ + partSpecToString(partSpec.getPartSpec())
+ + " with source location: " + srcLocation);
Path tgtLocation = new Path(partSpec.getLocation());
- Path destPath = mmWriteId == null ? ctx.getExternalTmpPath(tgtLocation)
- Path tmpPath = x.getCtx().getExternalTmpPath(tgtLocation);
- Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
- replicationSpec, new Path(srcLocation), tmpPath, x.getConf());
++ Path destPath = mmWriteId == null ? x.getCtx().getExternalTmpPath(tgtLocation)
+ : new Path(tgtLocation, ValidWriteIds.getMmFilePrefix(mmWriteId));
+ Path moveTaskSrc = mmWriteId == null ? destPath : tgtLocation;
+ Utilities.LOG14535.info("adding import work for partition with source location: "
+ + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
+ + mmWriteId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
- CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
- cw.setSkipSourceMmDirs(isSourceMm);
- DDLWork dw = new DDLWork(getInputs(), getOutputs(), addPartitionDesc);
++
++
++ Task<?> copyTask = null;
++ if (replicationSpec.isInReplicationScope()) {
++ if (isSourceMm || mmWriteId != null) {
++ // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
++ throw new RuntimeException(
++ "Not supported right now because Replication is completely screwed");
++ }
++ copyTask = ReplCopyTask.getLoadCopyTask(
++ replicationSpec, new Path(srcLocation), destPath, x.getConf());
++ } else {
++ CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
++ cw.setSkipSourceMmDirs(isSourceMm);
++ copyTask = TaskFactory.get(cw, x.getConf());
++ }
++
+ Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
+ x.getOutputs(), addPartitionDesc), x.getConf());
- LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
- Utilities.getTableDesc(table),
- partSpec.getPartSpec(), true);
+ LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
+ partSpec.getPartSpec(), true, mmWriteId);
loadTableWork.setInheritTableSpecs(false);
+ // Do not commit the write ID from each task; need to commit once.
+ // TODO: we should just change the import to use a single MoveTask, like dynparts.
+ loadTableWork.setIntermediateInMmWrite(mmWriteId != null);
- MoveWork mv = new MoveWork(getInputs(), getOutputs(), loadTableWork, null, false);
- @SuppressWarnings("unchecked")
- Task<?> copyTask = TaskFactory.get(cw, conf), addPartTask = TaskFactory.get(dw, conf),
- loadPartTask = TaskFactory.get(mv, conf);
+ Task<?> loadPartTask = TaskFactory.get(new MoveWork(
- x.getInputs(), x.getOutputs(), loadTableWork, null, false),
- x.getConf());
++ x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf());
copyTask.addDependentTask(loadPartTask);
addPartTask.addDependentTask(loadPartTask);
- rootTasks.add(copyTask);
+ x.getTasks().add(copyTask);
+ if (commitTask != null) {
+ loadPartTask.addDependentTask(commitTask);
+ }
return addPartTask;
}
}
@@@ -702,62 -702,69 +764,71 @@@
/**
* Create tasks for regular import, no repl complexity
+ * @param tblDesc
+ * @param partitionDescs
+ * @param isPartSpecSet
+ * @param replicationSpec
+ * @param table
+ * @param fromURI
+ * @param fs
+ * @param wh
*/
- private void createRegularImportTasks(List<Task<? extends Serializable>> rootTasks,
+ private static void createRegularImportTasks(
- CreateTableDesc tblDesc,
- List<AddPartitionDesc> partitionDescs,
- boolean isPartSpecSet,
- ReplicationSpec replicationSpec,
- Table table, URI fromURI, FileSystem fs, Warehouse wh, EximUtil.SemanticAnalyzerWrapperContext x)
+ CreateTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet,
+ ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh,
- Long mmWriteId, boolean isSourceMm)
++ EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm)
throws HiveException, URISyntaxException, IOException, MetaException {
- if (table != null){
+ if (table != null) {
if (table.isPartitioned()) {
- LOG.debug("table partitioned");
- Task<?> ict = createImportCommitTask(table.getDbName(), table.getTableName(), mmWriteId);
+ x.getLOG().debug("table partitioned");
++ Task<?> ict = createImportCommitTask(
++ table.getDbName(), table.getTableName(), mmWriteId, x.getConf());
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
- if ((ptn = db.getPartition(table, partSpec, false)) == null) {
- rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
- replicationSpec, mmWriteId, isSourceMm, ict));
+ if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
+ x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
++ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
} else {
throw new SemanticException(
ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
}
}
-
} else {
- LOG.debug("table non-partitioned");
+ x.getLOG().debug("table non-partitioned");
// ensure if destination is not empty only for regular import
Path tgtPath = new Path(table.getDataLocation().toString());
- FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf);
- checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec);
- loadTable(fromURI, table, false, tgtPath, mmWriteId, isSourceMm);
+ FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
+ checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x);
- loadTable(fromURI, table, false, tgtPath, replicationSpec,x);
++ loadTable(fromURI, table, false, tgtPath, replicationSpec, x, mmWriteId, isSourceMm);
}
// Set this to read because we can't overwrite any existing partitions
- outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
+ x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
} else {
- LOG.debug("table " + tblDesc.getTableName() + " does not exist");
+ x.getLOG().debug("table " + tblDesc.getTableName() + " does not exist");
+ @SuppressWarnings("unchecked")
- Task<?> t = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tblDesc), conf);
+ Task<?> t = TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), tblDesc), x.getConf());
table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
- Database parentDb = db.getDatabase(tblDesc.getDatabaseName());
+ Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
// Since we are going to be creating a new table in a db, we should mark that db as a write entity
// so that the auth framework can go to work there.
- outputs.add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED));
+ x.getOutputs().add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED));
if (isPartitioned(tblDesc)) {
+ Task<?> ict = createImportCommitTask(
- tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId);
++ tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
- t.addDependentTask(
- addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
+ t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
- replicationSpec, mmWriteId, isSourceMm, ict));
++ replicationSpec, x, mmWriteId, isSourceMm, ict));
}
} else {
- LOG.debug("adding dependent CopyWork/MoveWork for table");
+ x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
if (tblDesc.isExternal() && (tblDesc.getLocation() == null)) {
- LOG.debug("Importing in place, no emptiness check, no copying/loading");
+ x.getLOG().debug("Importing in place, no emptiness check, no copying/loading");
Path dataPath = new Path(fromURI.toString(), "data");
tblDesc.setLocation(dataPath.toString());
} else {
@@@ -767,32 -774,27 +838,35 @@@
} else {
tablePath = wh.getTablePath(parentDb, tblDesc.getTableName());
}
- FileSystem tgtFs = FileSystem.get(tablePath.toUri(), conf);
- checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec);
- t.addDependentTask(loadTable(fromURI, table, false, tablePath, mmWriteId, isSourceMm));
+ FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
+ checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x);
- t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x));
++ t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, mmWriteId, isSourceMm));
}
}
- rootTasks.add(t);
+ x.getTasks().add(t);
}
}
- private Task<?> createImportCommitTask(String dbName, String tblName, Long mmWriteId) {
++ private static Task<?> createImportCommitTask(
++ String dbName, String tblName, Long mmWriteId, HiveConf conf) {
+ @SuppressWarnings("unchecked")
+ Task<ImportCommitWork> ict = (mmWriteId == null) ? null : TaskFactory.get(
+ new ImportCommitWork(dbName, tblName, mmWriteId), conf);
+ return ict;
+ }
+
/**
* Create tasks for repl import
*/
- private void createReplImportTasks(List<Task<? extends Serializable>> rootTasks,
- CreateTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet,
- ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh,
- Long mmWriteId, boolean isSourceMm)
+ private static void createReplImportTasks(
+ CreateTableDesc tblDesc,
+ List<AddPartitionDesc> partitionDescs,
+ boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnCreateDb,
+ Table table, URI fromURI, FileSystem fs, Warehouse wh,
- EximUtil.SemanticAnalyzerWrapperContext x)
++ EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm)
throws HiveException, URISyntaxException, IOException, MetaException {
- Task dr = null;
+ Task<?> dr = null;
WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
if ((table != null) && (isPartitioned(tblDesc) != table.isPartitioned())){
@@@ -840,16 -859,13 +931,15 @@@
if (!replicationSpec.isMetadataOnly()) {
if (isPartitioned(tblDesc)) {
+ Task<?> ict = createImportCommitTask(
- tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId);
++ tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
- t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh,
- addPartitionDesc, replicationSpec, mmWriteId, isSourceMm, ict));
+ t.addDependentTask(
- addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
++ addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
}
} else {
- LOG.debug("adding dependent CopyWork/MoveWork for table");
- t.addDependentTask(loadTable(
- fromURI, table, true, new Path(tblDesc.getLocation()), mmWriteId, isSourceMm));
+ x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
- t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x));
++ t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, mmWriteId, isSourceMm));
}
}
if (dr == null){
@@@ -863,27 -879,27 +953,27 @@@
} else {
// Table existed, and is okay to replicate into, not dropping and re-creating.
if (table.isPartitioned()) {
- LOG.debug("table partitioned");
+ x.getLOG().debug("table partitioned");
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
-
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
-
+ Task<?> ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask(
- tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId);
- if ((ptn = db.getPartition(table, partSpec, false)) == null) {
++ tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
+ if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
if (!replicationSpec.isMetadataOnly()){
- rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
- replicationSpec, mmWriteId, isSourceMm, ict));
+ x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
++ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
}
} else {
// If replicating, then the partition already existing means we need to replace, maybe, if
// the destination ptn's repl.last.id is older than the replacement's.
if (replicationSpec.allowReplacementInto(ptn)){
if (!replicationSpec.isMetadataOnly()){
- rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh,
- addPartitionDesc, replicationSpec, mmWriteId, isSourceMm, ict));
+ x.getTasks().add(addSinglePartition(
- fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
++ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
} else {
- rootTasks.add(alterSinglePartition(fromURI, fs, tblDesc, table, wh,
- addPartitionDesc, replicationSpec, ptn));
+ x.getTasks().add(alterSinglePartition(
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
}
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
@@@ -907,10 -923,9 +997,10 @@@
return; // silently return, table is newer than our replacement.
}
if (!replicationSpec.isMetadataOnly()) {
- loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into
+ // repl-imports are replace-into
- loadTable(fromURI, table, true, new Path(fromURI), mmWriteId, isSourceMm);
++ loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x, mmWriteId, isSourceMm);
} else {
- rootTasks.add(alterTableTask(tblDesc));
+ x.getTasks().add(alterTableTask(tblDesc, x));
}
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------