You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/11/15 20:21:14 UTC

[42/50] [abbrv] hive git commit: HIVE-14671 : merge master into hive-14535 (exim conflicts merge 1) (Sergey Shelukhin)

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --cc metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index b90de43,d420f06..b2e2484
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@@ -12733,251 -12680,58 +12744,668 @@@ class GetValidWriteIdsRequest
        if ftype == TType.STOP:
          break
        if fid == 1:
 -        if ftype == TType.LIST:
 -          self.tables = []
 -          (_etype583, _size580) = iprot.readListBegin()
 -          for _i584 in xrange(_size580):
 -            _elem585 = Table()
 -            _elem585.read(iprot)
 -            self.tables.append(_elem585)
 +        if ftype == TType.STRING:
 +          self.dbName = iprot.readString()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 2:
 +        if ftype == TType.STRING:
 +          self.tblName = iprot.readString()
 +        else:
 +          iprot.skip(ftype)
 +      else:
 +        iprot.skip(ftype)
 +      iprot.readFieldEnd()
 +    iprot.readStructEnd()
 +
 +  def write(self, oprot):
 +    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
 +      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
 +      return
 +    oprot.writeStructBegin('GetValidWriteIdsRequest')
 +    if self.dbName is not None:
 +      oprot.writeFieldBegin('dbName', TType.STRING, 1)
 +      oprot.writeString(self.dbName)
 +      oprot.writeFieldEnd()
 +    if self.tblName is not None:
 +      oprot.writeFieldBegin('tblName', TType.STRING, 2)
 +      oprot.writeString(self.tblName)
 +      oprot.writeFieldEnd()
 +    oprot.writeFieldStop()
 +    oprot.writeStructEnd()
 +
 +  def validate(self):
 +    if self.dbName is None:
 +      raise TProtocol.TProtocolException(message='Required field dbName is unset!')
 +    if self.tblName is None:
 +      raise TProtocol.TProtocolException(message='Required field tblName is unset!')
 +    return
 +
 +
 +  def __hash__(self):
 +    value = 17
 +    value = (value * 31) ^ hash(self.dbName)
 +    value = (value * 31) ^ hash(self.tblName)
 +    return value
 +
 +  def __repr__(self):
 +    L = ['%s=%r' % (key, value)
 +      for key, value in self.__dict__.iteritems()]
 +    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
 +
 +  def __eq__(self, other):
 +    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 +
 +  def __ne__(self, other):
 +    return not (self == other)
 +
 +class GetValidWriteIdsResult:
 +  """
 +  Attributes:
 +   - lowWatermarkId
 +   - highWatermarkId
 +   - areIdsValid
 +   - ids
 +  """
 +
 +  thrift_spec = (
 +    None, # 0
 +    (1, TType.I64, 'lowWatermarkId', None, None, ), # 1
 +    (2, TType.I64, 'highWatermarkId', None, None, ), # 2
 +    (3, TType.BOOL, 'areIdsValid', None, None, ), # 3
 +    (4, TType.LIST, 'ids', (TType.I64,None), None, ), # 4
 +  )
 +
 +  def __init__(self, lowWatermarkId=None, highWatermarkId=None, areIdsValid=None, ids=None,):
 +    self.lowWatermarkId = lowWatermarkId
 +    self.highWatermarkId = highWatermarkId
 +    self.areIdsValid = areIdsValid
 +    self.ids = ids
 +
 +  def read(self, iprot):
 +    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
 +      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
 +      return
 +    iprot.readStructBegin()
 +    while True:
 +      (fname, ftype, fid) = iprot.readFieldBegin()
 +      if ftype == TType.STOP:
 +        break
 +      if fid == 1:
 +        if ftype == TType.I64:
 +          self.lowWatermarkId = iprot.readI64()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 2:
 +        if ftype == TType.I64:
 +          self.highWatermarkId = iprot.readI64()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 3:
 +        if ftype == TType.BOOL:
 +          self.areIdsValid = iprot.readBool()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 4:
 +        if ftype == TType.LIST:
 +          self.ids = []
 +          (_etype562, _size559) = iprot.readListBegin()
 +          for _i563 in xrange(_size559):
 +            _elem564 = iprot.readI64()
 +            self.ids.append(_elem564)
 +          iprot.readListEnd()
 +        else:
 +          iprot.skip(ftype)
 +      else:
 +        iprot.skip(ftype)
 +      iprot.readFieldEnd()
 +    iprot.readStructEnd()
 +
 +  def write(self, oprot):
 +    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
 +      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
 +      return
 +    oprot.writeStructBegin('GetValidWriteIdsResult')
 +    if self.lowWatermarkId is not None:
 +      oprot.writeFieldBegin('lowWatermarkId', TType.I64, 1)
 +      oprot.writeI64(self.lowWatermarkId)
 +      oprot.writeFieldEnd()
 +    if self.highWatermarkId is not None:
 +      oprot.writeFieldBegin('highWatermarkId', TType.I64, 2)
 +      oprot.writeI64(self.highWatermarkId)
 +      oprot.writeFieldEnd()
 +    if self.areIdsValid is not None:
 +      oprot.writeFieldBegin('areIdsValid', TType.BOOL, 3)
 +      oprot.writeBool(self.areIdsValid)
 +      oprot.writeFieldEnd()
 +    if self.ids is not None:
 +      oprot.writeFieldBegin('ids', TType.LIST, 4)
 +      oprot.writeListBegin(TType.I64, len(self.ids))
 +      for iter565 in self.ids:
 +        oprot.writeI64(iter565)
 +      oprot.writeListEnd()
 +      oprot.writeFieldEnd()
 +    oprot.writeFieldStop()
 +    oprot.writeStructEnd()
 +
 +  def validate(self):
 +    if self.lowWatermarkId is None:
 +      raise TProtocol.TProtocolException(message='Required field lowWatermarkId is unset!')
 +    if self.highWatermarkId is None:
 +      raise TProtocol.TProtocolException(message='Required field highWatermarkId is unset!')
 +    return
 +
 +
 +  def __hash__(self):
 +    value = 17
 +    value = (value * 31) ^ hash(self.lowWatermarkId)
 +    value = (value * 31) ^ hash(self.highWatermarkId)
 +    value = (value * 31) ^ hash(self.areIdsValid)
 +    value = (value * 31) ^ hash(self.ids)
 +    return value
 +
 +  def __repr__(self):
 +    L = ['%s=%r' % (key, value)
 +      for key, value in self.__dict__.iteritems()]
 +    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
 +
 +  def __eq__(self, other):
 +    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 +
 +  def __ne__(self, other):
 +    return not (self == other)
 +
 +class GetAllFunctionsResponse:
 +  """
 +  Attributes:
 +   - functions
 +  """
 +
 +  thrift_spec = (
 +    None, # 0
 +    (1, TType.LIST, 'functions', (TType.STRUCT,(Function, Function.thrift_spec)), None, ), # 1
 +  )
 +
 +  def __init__(self, functions=None,):
 +    self.functions = functions
 +
 +  def read(self, iprot):
 +    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
 +      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
 +      return
 +    iprot.readStructBegin()
 +    while True:
 +      (fname, ftype, fid) = iprot.readFieldBegin()
 +      if ftype == TType.STOP:
 +        break
 +      if fid == 1:
 +        if ftype == TType.LIST:
 +          self.functions = []
 +          (_etype569, _size566) = iprot.readListBegin()
 +          for _i570 in xrange(_size566):
 +            _elem571 = Function()
 +            _elem571.read(iprot)
 +            self.functions.append(_elem571)
 +          iprot.readListEnd()
 +        else:
 +          iprot.skip(ftype)
 +      else:
 +        iprot.skip(ftype)
 +      iprot.readFieldEnd()
 +    iprot.readStructEnd()
 +
 +  def write(self, oprot):
 +    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
 +      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
 +      return
 +    oprot.writeStructBegin('GetAllFunctionsResponse')
 +    if self.functions is not None:
 +      oprot.writeFieldBegin('functions', TType.LIST, 1)
 +      oprot.writeListBegin(TType.STRUCT, len(self.functions))
 +      for iter572 in self.functions:
 +        iter572.write(oprot)
 +      oprot.writeListEnd()
 +      oprot.writeFieldEnd()
 +    oprot.writeFieldStop()
 +    oprot.writeStructEnd()
 +
 +  def validate(self):
 +    return
 +
 +
 +  def __hash__(self):
 +    value = 17
 +    value = (value * 31) ^ hash(self.functions)
 +    return value
 +
 +  def __repr__(self):
 +    L = ['%s=%r' % (key, value)
 +      for key, value in self.__dict__.iteritems()]
 +    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
 +
 +  def __eq__(self, other):
 +    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 +
 +  def __ne__(self, other):
 +    return not (self == other)
 +
++class ClientCapabilities:
++  """
++  Attributes:
++   - values
++  """
++
++  thrift_spec = (
++    None, # 0
++    (1, TType.LIST, 'values', (TType.I32,None), None, ), # 1
++  )
++
++  def __init__(self, values=None,):
++    self.values = values
++
++  def read(self, iprot):
++    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
++      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
++      return
++    iprot.readStructBegin()
++    while True:
++      (fname, ftype, fid) = iprot.readFieldBegin()
++      if ftype == TType.STOP:
++        break
++      if fid == 1:
++        if ftype == TType.LIST:
++          self.values = []
++          (_etype576, _size573) = iprot.readListBegin()
++          for _i577 in xrange(_size573):
++            _elem578 = iprot.readI32()
++            self.values.append(_elem578)
++          iprot.readListEnd()
++        else:
++          iprot.skip(ftype)
++      else:
++        iprot.skip(ftype)
++      iprot.readFieldEnd()
++    iprot.readStructEnd()
++
++  def write(self, oprot):
++    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
++      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
++      return
++    oprot.writeStructBegin('ClientCapabilities')
++    if self.values is not None:
++      oprot.writeFieldBegin('values', TType.LIST, 1)
++      oprot.writeListBegin(TType.I32, len(self.values))
++      for iter579 in self.values:
++        oprot.writeI32(iter579)
++      oprot.writeListEnd()
++      oprot.writeFieldEnd()
++    oprot.writeFieldStop()
++    oprot.writeStructEnd()
++
++  def validate(self):
++    if self.values is None:
++      raise TProtocol.TProtocolException(message='Required field values is unset!')
++    return
++
++
++  def __hash__(self):
++    value = 17
++    value = (value * 31) ^ hash(self.values)
++    return value
++
++  def __repr__(self):
++    L = ['%s=%r' % (key, value)
++      for key, value in self.__dict__.iteritems()]
++    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
++
++  def __eq__(self, other):
++    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
++
++  def __ne__(self, other):
++    return not (self == other)
++
++class GetTableRequest:
++  """
++  Attributes:
++   - dbName
++   - tblName
++   - capabilities
++  """
++
++  thrift_spec = (
++    None, # 0
++    (1, TType.STRING, 'dbName', None, None, ), # 1
++    (2, TType.STRING, 'tblName', None, None, ), # 2
++    (3, TType.STRUCT, 'capabilities', (ClientCapabilities, ClientCapabilities.thrift_spec), None, ), # 3
++  )
++
++  def __init__(self, dbName=None, tblName=None, capabilities=None,):
++    self.dbName = dbName
++    self.tblName = tblName
++    self.capabilities = capabilities
++
++  def read(self, iprot):
++    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
++      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
++      return
++    iprot.readStructBegin()
++    while True:
++      (fname, ftype, fid) = iprot.readFieldBegin()
++      if ftype == TType.STOP:
++        break
++      if fid == 1:
++        if ftype == TType.STRING:
++          self.dbName = iprot.readString()
++        else:
++          iprot.skip(ftype)
++      elif fid == 2:
++        if ftype == TType.STRING:
++          self.tblName = iprot.readString()
++        else:
++          iprot.skip(ftype)
++      elif fid == 3:
++        if ftype == TType.STRUCT:
++          self.capabilities = ClientCapabilities()
++          self.capabilities.read(iprot)
++        else:
++          iprot.skip(ftype)
++      else:
++        iprot.skip(ftype)
++      iprot.readFieldEnd()
++    iprot.readStructEnd()
++
++  def write(self, oprot):
++    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
++      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
++      return
++    oprot.writeStructBegin('GetTableRequest')
++    if self.dbName is not None:
++      oprot.writeFieldBegin('dbName', TType.STRING, 1)
++      oprot.writeString(self.dbName)
++      oprot.writeFieldEnd()
++    if self.tblName is not None:
++      oprot.writeFieldBegin('tblName', TType.STRING, 2)
++      oprot.writeString(self.tblName)
++      oprot.writeFieldEnd()
++    if self.capabilities is not None:
++      oprot.writeFieldBegin('capabilities', TType.STRUCT, 3)
++      self.capabilities.write(oprot)
++      oprot.writeFieldEnd()
++    oprot.writeFieldStop()
++    oprot.writeStructEnd()
++
++  def validate(self):
++    if self.dbName is None:
++      raise TProtocol.TProtocolException(message='Required field dbName is unset!')
++    if self.tblName is None:
++      raise TProtocol.TProtocolException(message='Required field tblName is unset!')
++    return
++
++
++  def __hash__(self):
++    value = 17
++    value = (value * 31) ^ hash(self.dbName)
++    value = (value * 31) ^ hash(self.tblName)
++    value = (value * 31) ^ hash(self.capabilities)
++    return value
++
++  def __repr__(self):
++    L = ['%s=%r' % (key, value)
++      for key, value in self.__dict__.iteritems()]
++    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
++
++  def __eq__(self, other):
++    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
++
++  def __ne__(self, other):
++    return not (self == other)
++
++class GetTableResult:
++  """
++  Attributes:
++   - table
++  """
++
++  thrift_spec = (
++    None, # 0
++    (1, TType.STRUCT, 'table', (Table, Table.thrift_spec), None, ), # 1
++  )
++
++  def __init__(self, table=None,):
++    self.table = table
++
++  def read(self, iprot):
++    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
++      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
++      return
++    iprot.readStructBegin()
++    while True:
++      (fname, ftype, fid) = iprot.readFieldBegin()
++      if ftype == TType.STOP:
++        break
++      if fid == 1:
++        if ftype == TType.STRUCT:
++          self.table = Table()
++          self.table.read(iprot)
++        else:
++          iprot.skip(ftype)
++      else:
++        iprot.skip(ftype)
++      iprot.readFieldEnd()
++    iprot.readStructEnd()
++
++  def write(self, oprot):
++    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
++      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
++      return
++    oprot.writeStructBegin('GetTableResult')
++    if self.table is not None:
++      oprot.writeFieldBegin('table', TType.STRUCT, 1)
++      self.table.write(oprot)
++      oprot.writeFieldEnd()
++    oprot.writeFieldStop()
++    oprot.writeStructEnd()
++
++  def validate(self):
++    if self.table is None:
++      raise TProtocol.TProtocolException(message='Required field table is unset!')
++    return
++
++
++  def __hash__(self):
++    value = 17
++    value = (value * 31) ^ hash(self.table)
++    return value
++
++  def __repr__(self):
++    L = ['%s=%r' % (key, value)
++      for key, value in self.__dict__.iteritems()]
++    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
++
++  def __eq__(self, other):
++    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
++
++  def __ne__(self, other):
++    return not (self == other)
++
++class GetTablesRequest:
++  """
++  Attributes:
++   - dbName
++   - tblNames
++   - capabilities
++  """
++
++  thrift_spec = (
++    None, # 0
++    (1, TType.STRING, 'dbName', None, None, ), # 1
++    (2, TType.LIST, 'tblNames', (TType.STRING,None), None, ), # 2
++    (3, TType.STRUCT, 'capabilities', (ClientCapabilities, ClientCapabilities.thrift_spec), None, ), # 3
++  )
++
++  def __init__(self, dbName=None, tblNames=None, capabilities=None,):
++    self.dbName = dbName
++    self.tblNames = tblNames
++    self.capabilities = capabilities
++
++  def read(self, iprot):
++    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
++      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
++      return
++    iprot.readStructBegin()
++    while True:
++      (fname, ftype, fid) = iprot.readFieldBegin()
++      if ftype == TType.STOP:
++        break
++      if fid == 1:
++        if ftype == TType.STRING:
++          self.dbName = iprot.readString()
++        else:
++          iprot.skip(ftype)
++      elif fid == 2:
++        if ftype == TType.LIST:
++          self.tblNames = []
++          (_etype583, _size580) = iprot.readListBegin()
++          for _i584 in xrange(_size580):
++            _elem585 = iprot.readString()
++            self.tblNames.append(_elem585)
++          iprot.readListEnd()
++        else:
++          iprot.skip(ftype)
++      elif fid == 3:
++        if ftype == TType.STRUCT:
++          self.capabilities = ClientCapabilities()
++          self.capabilities.read(iprot)
++        else:
++          iprot.skip(ftype)
++      else:
++        iprot.skip(ftype)
++      iprot.readFieldEnd()
++    iprot.readStructEnd()
++
++  def write(self, oprot):
++    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
++      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
++      return
++    oprot.writeStructBegin('GetTablesRequest')
++    if self.dbName is not None:
++      oprot.writeFieldBegin('dbName', TType.STRING, 1)
++      oprot.writeString(self.dbName)
++      oprot.writeFieldEnd()
++    if self.tblNames is not None:
++      oprot.writeFieldBegin('tblNames', TType.LIST, 2)
++      oprot.writeListBegin(TType.STRING, len(self.tblNames))
++      for iter586 in self.tblNames:
++        oprot.writeString(iter586)
++      oprot.writeListEnd()
++      oprot.writeFieldEnd()
++    if self.capabilities is not None:
++      oprot.writeFieldBegin('capabilities', TType.STRUCT, 3)
++      self.capabilities.write(oprot)
++      oprot.writeFieldEnd()
++    oprot.writeFieldStop()
++    oprot.writeStructEnd()
++
++  def validate(self):
++    if self.dbName is None:
++      raise TProtocol.TProtocolException(message='Required field dbName is unset!')
++    return
++
++
++  def __hash__(self):
++    value = 17
++    value = (value * 31) ^ hash(self.dbName)
++    value = (value * 31) ^ hash(self.tblNames)
++    value = (value * 31) ^ hash(self.capabilities)
++    return value
++
++  def __repr__(self):
++    L = ['%s=%r' % (key, value)
++      for key, value in self.__dict__.iteritems()]
++    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
++
++  def __eq__(self, other):
++    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
++
++  def __ne__(self, other):
++    return not (self == other)
++
++class GetTablesResult:
++  """
++  Attributes:
++   - tables
++  """
++
++  thrift_spec = (
++    None, # 0
++    (1, TType.LIST, 'tables', (TType.STRUCT,(Table, Table.thrift_spec)), None, ), # 1
++  )
++
++  def __init__(self, tables=None,):
++    self.tables = tables
++
++  def read(self, iprot):
++    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
++      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
++      return
++    iprot.readStructBegin()
++    while True:
++      (fname, ftype, fid) = iprot.readFieldBegin()
++      if ftype == TType.STOP:
++        break
++      if fid == 1:
++        if ftype == TType.LIST:
++          self.tables = []
++          (_etype590, _size587) = iprot.readListBegin()
++          for _i591 in xrange(_size587):
++            _elem592 = Table()
++            _elem592.read(iprot)
++            self.tables.append(_elem592)
+           iprot.readListEnd()
+         else:
+           iprot.skip(ftype)
+       else:
+         iprot.skip(ftype)
+       iprot.readFieldEnd()
+     iprot.readStructEnd()
+ 
+   def write(self, oprot):
+     if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+       oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+       return
+     oprot.writeStructBegin('GetTablesResult')
+     if self.tables is not None:
+       oprot.writeFieldBegin('tables', TType.LIST, 1)
+       oprot.writeListBegin(TType.STRUCT, len(self.tables))
 -      for iter586 in self.tables:
 -        iter586.write(oprot)
++      for iter593 in self.tables:
++        iter593.write(oprot)
+       oprot.writeListEnd()
+       oprot.writeFieldEnd()
+     oprot.writeFieldStop()
+     oprot.writeStructEnd()
+ 
+   def validate(self):
+     if self.tables is None:
+       raise TProtocol.TProtocolException(message='Required field tables is unset!')
+     return
+ 
+ 
+   def __hash__(self):
+     value = 17
+     value = (value * 31) ^ hash(self.tables)
+     return value
+ 
+   def __repr__(self):
+     L = ['%s=%r' % (key, value)
+       for key, value in self.__dict__.iteritems()]
+     return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+ 
+   def __eq__(self, other):
+     return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+ 
+   def __ne__(self, other):
+     return not (self == other)
+ 
  class TableMeta:
    """
    Attributes:

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 2801192,c0ef25e..3b0f368
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@@ -119,13 -236,9 +126,10 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import javax.jdo.JDOException;
- 
  import java.io.IOException;
- import java.net.InetAddress;
- import java.net.InetSocketAddress;
- import java.net.UnknownHostException;
  import java.nio.ByteBuffer;
 +import java.sql.SQLException;
+ import java.security.PrivilegedExceptionAction;
  import java.text.DateFormat;
  import java.text.SimpleDateFormat;
  import java.util.AbstractMap;
@@@ -2272,18 -2531,34 +2378,34 @@@ public class HiveMetaStore extends Thri
              continue;
            }
  
+           final UserGroupInformation ugi;
+           try {
+             ugi = UserGroupInformation.getCurrentUser();
+           } catch (IOException e) {
+             throw new RuntimeException(e);
+           }
  
 -          partFutures.add(threadPool.submit(new Callable() {
 +          partFutures.add(threadPool.submit(new Callable<Partition>() {
              @Override
              public Partition call() throws Exception {
-               boolean madeDir = createLocationForAddedPartition(table, part);
-               if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) {
-                 // Technically, for ifNotExists case, we could insert one and discard the other
-                 // because the first one now "exists", but it seems better to report the problem
-                 // upstream as such a command doesn't make sense.
-                 throw new MetaException("Duplicate partitions in the list: " + part);
-               }
-               initializeAddedPartition(table, part, madeDir);
+               ugi.doAs(new PrivilegedExceptionAction<Object>() {
+                 @Override
+                 public Object run() throws Exception {
+                   try {
+                     boolean madeDir = createLocationForAddedPartition(table, part);
+                     if (addedPartitions.put(new PartValEqWrapper(part), madeDir) != null) {
+                       // Technically, for ifNotExists case, we could insert one and discard the other
+                       // because the first one now "exists", but it seems better to report the problem
+                       // upstream as such a command doesn't make sense.
+                       throw new MetaException("Duplicate partitions in the list: " + part);
+                     }
+                     initializeAddedPartition(table, part, madeDir);
+                   } catch (MetaException e) {
+                     throw new IOException(e.getMessage(), e);
+                   }
+                   return null;
+                 }
+               });
                return part;
              }
            }));

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --cc metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index b8b81ed,2f59054..f698125
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@@ -28,8 -30,8 +30,9 @@@ import org.apache.hadoop.hive.common.cl
  import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
  import org.apache.hadoop.hive.conf.HiveConfUtil;
 +import org.apache.hadoop.hive.metastore.api.*;
  import org.apache.hadoop.hive.metastore.TableType;
+ import org.apache.hadoop.hive.metastore.api.*;
  import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
  import org.apache.hadoop.hive.metastore.txn.TxnUtils;
  import org.apache.hadoop.hive.shims.ShimLoader;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 0000000,4c0f817..46c817a
mode 000000,100644..100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@@ -1,0 -1,250 +1,251 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.ql.exec;
+ 
+ import org.apache.hadoop.fs.FSDataInputStream;
+ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+ import org.apache.hadoop.hive.ql.plan.CopyWork;
+ import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
+ 
+ import java.io.BufferedReader;
+ import java.io.BufferedWriter;
+ import java.io.IOException;
+ import java.io.InputStreamReader;
+ import java.io.OutputStream;
+ import java.io.OutputStreamWriter;
+ import java.io.Serializable;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.List;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hive.common.FileUtils;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.ql.DriverContext;
+ import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
+ import org.apache.hadoop.hive.ql.plan.api.StageType;
+ import org.apache.hadoop.util.StringUtils;
+ 
+ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
+ 
+ 
+   private static final long serialVersionUID = 1L;
+ 
+   private static transient final Logger LOG = LoggerFactory.getLogger(ReplCopyTask.class);
+ 
+   public ReplCopyTask(){
+     super();
+   }
+ 
+   @Override
+   protected int execute(DriverContext driverContext) {
+     LOG.debug("ReplCopyTask.execute()");
+     FileSystem dstFs = null;
+     Path toPath = null;
+     try {
 -      Path fromPath = work.getFromPath();
 -      toPath = work.getToPath();
++      // TODO# merge with real CopyTask logic
++      Path fromPath = work.getFromPaths()[0];
++      toPath = work.getToPaths()[0];
+ 
+       console.printInfo("Copying data from " + fromPath.toString(), " to "
+           + toPath.toString());
+ 
+       ReplCopyWork rwork = ((ReplCopyWork)work);
+ 
+       FileSystem srcFs = fromPath.getFileSystem(conf);
+       dstFs = toPath.getFileSystem(conf);
+ 
+       List<FileStatus> srcFiles = new ArrayList<FileStatus>();
+       FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
+       LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length));
+       if (! rwork.getReadListFromInput()){
+         if (srcs == null || srcs.length == 0) {
+           if (work.isErrorOnSrcEmpty()) {
+             console.printError("No files matching path: " + fromPath.toString());
+             return 3;
+           } else {
+             return 0;
+           }
+         }
+       } else {
+         LOG.debug("ReplCopyTask making sense of _files");
+         // Our input is probably the result of a _files listing, we should expand out _files.
+         srcFiles = filesInFileListing(srcFs,fromPath);
+         LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" : srcFiles.size()));
+         if (srcFiles == null){
+           if (work.isErrorOnSrcEmpty()) {
+             console.printError("No _files entry found on source: " + fromPath.toString());
+             return 5;
+           } else {
+             return 0;
+           }
+         }
+       }
+       // Add in all the lone filecopies expected as well - applies to
+       // both _files case stragglers and regular copies
+       srcFiles.addAll(Arrays.asList(srcs));
+       LOG.debug("ReplCopyTask numFiles:" + (srcFiles == null ? "null" : srcFiles.size()));
+ 
+       boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+       if (!FileUtils.mkdir(dstFs, toPath, inheritPerms, conf)) {
+         console.printError("Cannot make target directory: " + toPath.toString());
+         return 2;
+       }
+ 
+       BufferedWriter listBW = null;
+       if (rwork.getListFilesOnOutputBehaviour()){
+         Path listPath = new Path(toPath,"_files");
+         LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString());
+         if (dstFs.exists(listPath)){
+           console.printError("Cannot make target _files file:" + listPath.toString());
+           return 4;
+         }
+         listBW = new BufferedWriter(new OutputStreamWriter(dstFs.create(listPath)));
+         // TODO : verify that not specifying charset here does not bite us
+         // later(for cases where filenames have unicode chars)
+       }
+ 
+       for (FileStatus oneSrc : srcFiles) {
+         console.printInfo("Copying file: " + oneSrc.getPath().toString());
+         LOG.debug("Copying file: " + oneSrc.getPath().toString());
+         if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){
+           FileSystem actualSrcFs = null;
+           if (rwork.getReadListFromInput()){
+             // TODO : filesystemcache prevents this from being a perf nightmare, but we
+             // should still probably follow up to see if we need to do something better here.
+             actualSrcFs = oneSrc.getPath().getFileSystem(conf);
+           } else {
+             actualSrcFs = srcFs;
+           }
+ 
+           LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath);
+           if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath,
+             false, // delete source
+             true, // overwrite destination
+             conf)) {
+           console.printError("Failed to copy: '" + oneSrc.getPath().toString()
+               + "to: '" + toPath.toString() + "'");
+           return 1;
+           }
+         }else{
+           LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri());
+           console.printInfo("Tracking file: " + oneSrc.getPath().toUri());
+           listBW.write(oneSrc.getPath().toUri().toString() + "\n");
+         }
+       }
+ 
+       if (listBW != null){
+         listBW.close();
+       }
+ 
+       return 0;
+ 
+     } catch (Exception e) {
+       console.printError("Failed with exception " + e.getMessage(), "\n"
+           + StringUtils.stringifyException(e));
+       return (1);
+     }
+   }
+ 
+ 
+   private List<FileStatus> filesInFileListing(FileSystem fs, Path path)
+       throws IOException {
+     Path fileListing = new Path(path, "_files");
+     LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri());
+     if (! fs.exists(fileListing)){
+       LOG.debug("ReplCopyTask : _files does not exist");
+       return null; // Returning null from this fn can serve as an err condition.
+       // On success, but with nothing to return, we can return an empty list.
+     }
+ 
+     List<FileStatus> ret = new ArrayList<FileStatus>();
+     BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing)));
+     // TODO : verify if skipping charset here is okay
+ 
+     String line = null;
+     while ( (line = br.readLine()) != null){
+       LOG.debug("ReplCopyTask :_filesReadLine:" + line);
+       Path p = new Path(line);
+       FileSystem srcFs = p.getFileSystem(conf); // TODO : again, fs cache should make this okay, but if not, revisit
+       ret.add(srcFs.getFileStatus(p));
+       // Note - we need srcFs rather than fs, because it is possible that the _files lists files
+       // which are from a different filesystem than the fs where the _files file itself was loaded
+       // from. Currently, it is possible, for eg., to do REPL LOAD hdfs://<ip>/dir/ and for the _files
+       // in it to contain hdfs://<name>/ entries, and/or vice-versa, and this causes errors.
+       // It might also be possible that there will be a mix of them in a given _files file.
+       // TODO: revisit close to the end of replv2 dev, to see if our assumption now still holds,
+       // and if not so, optimize.
+     }
+ 
+     return ret;
+   }
+ 
+   @Override
+   public StageType getType() {
+     return StageType.COPY;
+     // there's no extensive need for this to have its own type - it mirrors
+     // the intent of copy enough. This might change later, though.
+   }
+ 
+   @Override
+   public String getName() {
+     return "REPL_COPY";
+   }
+ 
+   public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) {
+     Task<?> copyTask = null;
+     LOG.debug("ReplCopyTask:getLoadCopyTask: "+srcPath + "=>" + dstPath);
+     if (replicationSpec.isInReplicationScope()){
+       ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false);
+       LOG.debug("ReplCopyTask:\trcwork");
+       if (replicationSpec.isLazy()){
+         LOG.debug("ReplCopyTask:\tlazy");
+         rcwork.setReadListFromInput(true);
+       }
+       copyTask = TaskFactory.get(rcwork, conf);
+     } else {
+       LOG.debug("ReplCopyTask:\tcwork");
+       copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
+     }
+     return copyTask;
+   }
+ 
+   public static Task<?> getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) {
+     Task<?> copyTask = null;
+     LOG.debug("ReplCopyTask:getDumpCopyTask: "+srcPath + "=>" + dstPath);
+     if (replicationSpec.isInReplicationScope()){
+       ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false);
+       LOG.debug("ReplCopyTask:\trcwork");
+       if (replicationSpec.isLazy()){
+         LOG.debug("ReplCopyTask:\tlazy");
+         rcwork.setListFilesOnOutputBehaviour(true);
+       }
+       copyTask = TaskFactory.get(rcwork, conf);
+     } else {
+       LOG.debug("ReplCopyTask:\tcwork");
+       copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
+     }
+     return copyTask;
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 40d055a,867e445..7e1e806
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@@ -318,9 -318,10 +318,10 @@@ public class DbTxnManager extends HiveT
            // This is a file or something we don't hold locks for.
            continue;
        }
 -      if(t != null && AcidUtils.isAcidTable(t)) {
 +      if(t != null && AcidUtils.isFullAcidTable(t)) {
          compBuilder.setIsAcid(true);
        }
+       compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
        LockComponent comp = compBuilder.build();
        LOG.debug("Adding lock component to lock request " + comp.toString());
        rqstBuilder.addLockComponent(comp);

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index 12dea9c,f61274b..b46f615
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@@ -32,23 -22,28 +32,31 @@@ import java.io.FileNotFoundException
  import java.io.IOException;
  import java.io.Serializable;
  import java.net.URI;
+ import java.util.HashSet;
+ import java.util.List;
  
  import org.antlr.runtime.tree.Tree;
++import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.FileUtils;
  import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.ql.Context;
  import org.apache.hadoop.hive.ql.ErrorMsg;
  import org.apache.hadoop.hive.ql.QueryState;
+ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
  import org.apache.hadoop.hive.ql.exec.Task;
 +import org.apache.hadoop.hive.ql.exec.TaskFactory;
  import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+ import org.apache.hadoop.hive.ql.metadata.Hive;
  import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
  import org.apache.hadoop.hive.ql.metadata.Partition;
  import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
  import org.apache.hadoop.hive.ql.metadata.Table;
 +import org.apache.hadoop.hive.ql.plan.CopyWork;
+ import org.slf4j.Logger;
  
  /**
   * ExportSemanticAnalyzer.
@@@ -197,53 -195,22 +218,75 @@@ public class ExportSemanticAnalyzer ext
          for (Partition partition : partitions) {
            Path fromPath = partition.getDataLocation();
            Path toPartPath = new Path(parentPath, partition.getName());
-           CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toPartPath);
-           rootTasks.add(TaskFactory.get(cw, conf));
 -          Task<? extends Serializable> rTask =
 -              ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toPartPath, conf);
 -          rootTasks.add(rTask);
++          Task<?> copyTask = null;
++          if (replicationSpec.isInReplicationScope()) {
++            if (isMmTable) {
++              // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
++              throw new SemanticException(
++                  "Not supported right now because Replication is completely screwed");
++            }
++            copyTask = ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toPartPath, conf);
++          } else {
++            CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toPartPath, conf);
++            copyTask = TaskFactory.get(cw, conf);
++          }
++          rootTasks.add(copyTask);
            inputs.add(new ReadEntity(partition));
          }
        } else {
          Path fromPath = ts.tableHandle.getDataLocation();
          Path toDataPath = new Path(parentPath, "data");
-         CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toDataPath);
-         rootTasks.add(TaskFactory.get(cw, conf));
 -        Task<? extends Serializable> rTask =
 -            ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
 -        rootTasks.add(rTask);
++        Task<?> copyTask = null;
++        if (replicationSpec.isInReplicationScope()) {
++          if (isMmTable) {
++            // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
++            throw new SemanticException(
++                "Not supported right now because Replication is completely screwed");
++          }
++          copyTask = ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
++        } else {
++          CopyWork cw = createCopyWork(isMmTable, lbLevels, ids, fromPath, toDataPath, conf);
++          copyTask = TaskFactory.get(cw, conf);
++        }
++        rootTasks.add(copyTask);
          inputs.add(new ReadEntity(ts.tableHandle));
        }
-       outputs.add(toWriteEntity(parentPath));
 -      outputs.add(toWriteEntity(parentPath,conf));
++      outputs.add(toWriteEntity(parentPath, conf));
 +    } catch (HiveException | IOException ex) {
 +      throw new SemanticException(ex);
      }
    }
  
-   private CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidWriteIds ids,
-       Path fromPath, Path toDataPath) throws IOException {
++  private static CopyWork createCopyWork(boolean isMmTable, int lbLevels, ValidWriteIds ids,
++      Path fromPath, Path toDataPath, Configuration conf) throws IOException {
 +    List<Path> validPaths = null;
 +    if (isMmTable) {
 +      fromPath = fromPath.getFileSystem(conf).makeQualified(fromPath);
 +      validPaths = Utilities.getValidMmDirectoriesFromTableOrPart(fromPath, conf, ids, lbLevels);
 +    }
 +    if (validPaths == null) {
 +      return new CopyWork(fromPath, toDataPath, false); // Not MM, or no need to skip anything.
 +    } else {
 +      return createCopyWorkForValidPaths(fromPath, toDataPath, validPaths);
 +    }
 +  }
  
-   private CopyWork createCopyWorkForValidPaths(
++  private static CopyWork createCopyWorkForValidPaths(
 +      Path fromPath, Path toPartPath, List<Path> validPaths) {
 +    Path[] from = new Path[validPaths.size()], to = new Path[validPaths.size()];
 +    int i = 0;
 +    String fromPathStr = fromPath.toString();
 +    if (!fromPathStr.endsWith(Path.SEPARATOR)) {
 +      fromPathStr += "/";
 +    }
 +    for (Path validPath : validPaths) {
 +      from[i] = validPath;
 +      // TODO: assumes the results are already qualified.
 +      to[i] = new Path(toPartPath, validPath.toString().substring(fromPathStr.length()));
 +      Utilities.LOG14535.info("Will copy " + from[i] + " to " + to[i]
 +          + " based on dest " + toPartPath + ", from " + fromPathStr + ", subpath " + validPath);
 +      ++i;
 +    }
 +    return new CopyWork(from, to);
 +  }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 8aa076f,3420efd..bceef45
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@@ -33,6 -33,6 +33,7 @@@ import java.util.TreeMap
  
  import org.antlr.runtime.tree.Tree;
  import org.apache.commons.lang.ObjectUtils;
++import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
@@@ -50,7 -49,7 +51,8 @@@ import org.apache.hadoop.hive.metastore
  import org.apache.hadoop.hive.metastore.api.Partition;
  import org.apache.hadoop.hive.ql.ErrorMsg;
  import org.apache.hadoop.hive.ql.QueryState;
 +import org.apache.hadoop.hive.ql.exec.ImportCommitWork;
+ import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
  import org.apache.hadoop.hive.ql.exec.Task;
  import org.apache.hadoop.hive.ql.exec.TaskFactory;
  import org.apache.hadoop.hive.ql.exec.Utilities;
@@@ -119,12 -121,8 +125,9 @@@ public class ImportSemanticAnalyzer ext
              Map.Entry<String,String> dbTablePair = getDbTableNamePair(tableNameNode);
              parsedDbName = dbTablePair.getKey();
              parsedTableName = dbTablePair.getValue();
-             if (parsedDbName != null){
-               isDbNameSet = true;
-             }
              // get partition metadata if partition specified
              if (child.getChildCount() == 2) {
 +              @SuppressWarnings("unused") // TODO: wtf?
                ASTNode partspec = (ASTNode) child.getChild(1);
                isPartSpecSet = true;
                parsePartitionSpec(child, parsedPartSpec);
@@@ -134,126 -132,11 +137,10 @@@
        }
  
        // parsing statement is now done, on to logic.
- 
-       // initialize load path
-       URI fromURI = EximUtil.getValidatedURI(conf, stripQuotes(fromTree.getText()));
-       FileSystem fs = FileSystem.get(fromURI, conf);
-       Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
-       inputs.add(toReadEntity(fromPath));
- 
-       EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
-       try {
-         rv =  EximUtil.readMetaData(fs, new Path(fromPath, METADATA_NAME));
-       } catch (IOException e) {
-         throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
-       }
- 
-       ReplicationSpec replicationSpec = rv.getReplicationSpec();
-       if (replicationSpec.isNoop()){
-         // nothing to do here, silently return.
-         return;
-       }
- 
-       String dbname = SessionState.get().getCurrentDatabase();
-       if (isDbNameSet){
-         // If the parsed statement contained a db.tablename specification, prefer that.
-         dbname = parsedDbName;
-       }
- 
-       // Create table associated with the import
-       // Executed if relevant, and used to contain all the other details about the table if not.
-       CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname, rv.getTable());
-       boolean isSourceMm = MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps());
- 
-       if (isExternalSet) {
-         if (isSourceMm) {
-           throw new SemanticException("Cannot import an MM table as external");
-         }
-         tblDesc.setExternal(isExternalSet);
-         // This condition-check could have been avoided, but to honour the old
-         // default of not calling if it wasn't set, we retain that behaviour.
-         // TODO:cleanup after verification that the outer if isn't really needed here
-       }
- 
-       if (isLocationSet){
-         tblDesc.setLocation(parsedLocation);
-         inputs.add(toReadEntity(parsedLocation));
-       }
- 
-       if (isTableSet){
-         tblDesc.setTableName(parsedTableName);
-       }
- 
-       List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
-       Iterable<Partition> partitions = rv.getPartitions();
-       for (Partition partition : partitions) {
-         // TODO: this should ideally not create AddPartitionDesc per partition
-         AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
-         partitionDescs.add(partsDesc);
-       }
- 
-       if (isPartSpecSet){
-         // The import specification asked for only a particular partition to be loaded
-         // We load only that, and ignore all the others.
-         boolean found = false;
-         for (Iterator<AddPartitionDesc> partnIter = partitionDescs
-             .listIterator(); partnIter.hasNext();) {
-           AddPartitionDesc addPartitionDesc = partnIter.next();
-           if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) {
-             found = true;
-           } else {
-             partnIter.remove();
-           }
-         }
-         if (!found) {
-           throw new SemanticException(
-               ErrorMsg.INVALID_PARTITION
-                   .getMsg(" - Specified partition not found in import directory"));
-         }
-       }
- 
-       if (tblDesc.getTableName() == null) {
-         // Either we got the tablename from the IMPORT statement (first priority)
-         // or from the export dump.
-         throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg());
-       } else {
-         conf.set("import.destination.table", tblDesc.getTableName());
-         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
-           addPartitionDesc.setTableName(tblDesc.getTableName());
-         }
-       }
- 
-       Warehouse wh = new Warehouse(conf);
-       Table table = tableIfExists(tblDesc);
- 
-       if (table != null){
-         checkTable(table, tblDesc, replicationSpec);
-         LOG.debug("table " + tblDesc.getTableName() + " exists: metadata checked");
-         tableExists = true;
-       }
- 
-       Long mmWriteId = null;
-       if (table != null && MetaStoreUtils.isInsertOnlyTable(table.getParameters())) {
-         mmWriteId = db.getNextTableWriteId(table.getDbName(), table.getTableName());
-       } else if (table == null && isSourceMm) {
-         // We could import everything as is - directories and IDs, but that won't work with ACID
-         // txn ids in future. So, let's import everything into the new MM directory with ID == 0.
-         mmWriteId = 0l;
-       }
-       if (mmWriteId != null) {
-         tblDesc.setInitialMmWriteId(mmWriteId);
-       }
-       if (!replicationSpec.isInReplicationScope()){
-         createRegularImportTasks(
-             rootTasks, tblDesc, partitionDescs,
-             isPartSpecSet, replicationSpec, table,
-             fromURI, fs, wh, mmWriteId, isSourceMm);
-       } else {
-         createReplImportTasks(
-             rootTasks, tblDesc, partitionDescs,
-             isPartSpecSet, replicationSpec, table,
-             fromURI, fs, wh, mmWriteId, isSourceMm);
-       }
+       tableExists = prepareImport(
+           isLocationSet, isExternalSet, isPartSpecSet, waitOnCreateDb,
+           parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(),
+           new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx));
 -
      } catch (SemanticException e) {
        throw e;
      } catch (Exception e) {
@@@ -284,7 -167,123 +171,138 @@@
      }
    }
  
-   private AddPartitionDesc getBaseAddPartitionDescFromPartition(
+   public static boolean prepareImport(
+       boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnCreateDb,
+       String parsedLocation, String parsedTableName, String parsedDbName,
+       LinkedHashMap<String, String> parsedPartSpec,
 -      String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x
 -  ) throws IOException, MetaException, HiveException, URISyntaxException {
++      String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x)
++      throws IOException, MetaException, HiveException, URISyntaxException {
+ 
+     // initialize load path
+     URI fromURI = EximUtil.getValidatedURI(x.getConf(), stripQuotes(fromLocn));
+     Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+ 
+     FileSystem fs = FileSystem.get(fromURI, x.getConf());
+     x.getInputs().add(toReadEntity(fromPath, x.getConf()));
+ 
+     EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+     try {
+       rv =  EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+     } catch (IOException e) {
+       throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+     }
+ 
+     ReplicationSpec replicationSpec = rv.getReplicationSpec();
+     if (replicationSpec.isNoop()){
+       // nothing to do here, silently return.
+       return false;
+     }
+ 
+     String dbname = SessionState.get().getCurrentDatabase();
+     if ((parsedDbName !=null) && (!parsedDbName.isEmpty())){
+       // If the parsed statement contained a db.tablename specification, prefer that.
+       dbname = parsedDbName;
+     }
+ 
+     // Create table associated with the import
+     // Executed if relevant, and used to contain all the other details about the table if not.
+     CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable());
++    boolean isSourceMm = MetaStoreUtils.isInsertOnlyTable(tblDesc.getTblProps());
+ 
+     if (isExternalSet){
++      if (isSourceMm) {
++        throw new SemanticException("Cannot import an MM table as external");
++      }
+       tblDesc.setExternal(isExternalSet);
+       // This condition-check could have been avoided, but to honour the old
+       // default of not calling if it wasn't set, we retain that behaviour.
+       // TODO:cleanup after verification that the outer if isn't really needed here
+     }
+ 
+     if (isLocationSet){
+       tblDesc.setLocation(parsedLocation);
+       x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf()));
+     }
+ 
+     if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){
+       tblDesc.setTableName(parsedTableName);
+     }
+ 
+     List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
+     Iterable<Partition> partitions = rv.getPartitions();
+     for (Partition partition : partitions) {
+       // TODO: this should ideally not create AddPartitionDesc per partition
+       AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
+       partitionDescs.add(partsDesc);
+     }
+ 
+     if (isPartSpecSet){
+       // The import specification asked for only a particular partition to be loaded
+       // We load only that, and ignore all the others.
+       boolean found = false;
+       for (Iterator<AddPartitionDesc> partnIter = partitionDescs
+           .listIterator(); partnIter.hasNext();) {
+         AddPartitionDesc addPartitionDesc = partnIter.next();
+         if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) {
+           found = true;
+         } else {
+           partnIter.remove();
+         }
+       }
+       if (!found) {
+         throw new SemanticException(
+             ErrorMsg.INVALID_PARTITION
+                 .getMsg(" - Specified partition not found in import directory"));
+       }
+     }
+ 
+     if (tblDesc.getTableName() == null) {
+       // Either we got the tablename from the IMPORT statement (first priority)
+       // or from the export dump.
+       throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg());
+     } else {
+       x.getConf().set("import.destination.table", tblDesc.getTableName());
+       for (AddPartitionDesc addPartitionDesc : partitionDescs) {
+         addPartitionDesc.setTableName(tblDesc.getTableName());
+       }
+     }
+ 
+     Warehouse wh = new Warehouse(x.getConf());
+     Table table = tableIfExists(tblDesc, x.getHive());
+     boolean tableExists = false;
+ 
+     if (table != null){
+       checkTable(table, tblDesc,replicationSpec, x.getConf());
+       x.getLOG().debug("table " + tblDesc.getTableName() + " exists: metadata checked");
+       tableExists = true;
+     }
+ 
 -    if (!replicationSpec.isInReplicationScope()){
++    Long mmWriteId = null;
++    if (table != null && MetaStoreUtils.isInsertOnlyTable(table.getParameters())) {
++      mmWriteId = x.getHive().getNextTableWriteId(table.getDbName(), table.getTableName());
++    } else if (table == null && isSourceMm) {
++      // We could import everything as is - directories and IDs, but that won't work with ACID
++      // txn ids in future. So, let's import everything into the new MM directory with ID == 0.
++      mmWriteId = 0l;
++    }
++    if (mmWriteId != null) {
++      tblDesc.setInitialMmWriteId(mmWriteId);
++    }
++    if (!replicationSpec.isInReplicationScope()) {
+       createRegularImportTasks(
+           tblDesc, partitionDescs,
+           isPartSpecSet, replicationSpec, table,
 -          fromURI, fs, wh, x);
++          fromURI, fs, wh, x, mmWriteId, isSourceMm);
+     } else {
+       createReplImportTasks(
+           tblDesc, partitionDescs,
+           isPartSpecSet, replicationSpec, waitOnCreateDb, table,
 -          fromURI, fs, wh, x);
++          fromURI, fs, wh, x, mmWriteId, isSourceMm);
+     }
+     return tableExists;
+   }
+ 
+   private static AddPartitionDesc getBaseAddPartitionDescFromPartition(
        Path fromPath, String dbname, CreateTableDesc tblDesc, Partition partition) throws MetaException {
      AddPartitionDesc partsDesc = new AddPartitionDesc(dbname, tblDesc.getTableName(),
          EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
@@@ -337,98 -336,95 +355,140 @@@
      return tblDesc;
    }
  
-   private Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
++
+   private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
 -                            ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) {
++      ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x,
 +      Long mmWriteId, boolean isSourceMm) {
      Path dataPath = new Path(fromURI.toString(), "data");
-     Path destPath = mmWriteId == null ? ctx.getExternalTmpPath(tgtPath)
 -    Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath);
 -    Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf());
 -    LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
 -        Utilities.getTableDesc(table), new TreeMap<String, String>(),
 -        replace);
 -    Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(),
 -        x.getOutputs(), loadTableWork, null, false), x.getConf());
++    Path destPath = mmWriteId == null ? x.getCtx().getExternalTmpPath(tgtPath)
 +        : new Path(tgtPath, ValidWriteIds.getMmFilePrefix(mmWriteId));
 +    Utilities.LOG14535.info("adding import work for table with source location: "
 +        + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm "
 +        + mmWriteId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName()));
 +
-     CopyWork cv = new CopyWork(dataPath, destPath, false);
-     cv.setSkipSourceMmDirs(isSourceMm);
++    Task<?> copyTask = null;
++    if (replicationSpec.isInReplicationScope()) {
++      if (isSourceMm || mmWriteId != null) {
++        // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
++        throw new RuntimeException(
++            "Not supported right now because Replication is completely screwed");
++      }
++      ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf());
++    } else {
++      CopyWork cw = new CopyWork(dataPath, destPath, false);
++      cw.setSkipSourceMmDirs(isSourceMm);
++      copyTask = TaskFactory.get(cw, x.getConf());
++    }
++
 +    LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
 +        Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, mmWriteId);
-     MoveWork mv = new MoveWork(getInputs(), getOutputs(), loadTableWork, null, false);
-     @SuppressWarnings("unchecked")
-     Task<?> loadTableTask = TaskFactory.get(mv, conf), copyTask = TaskFactory.get(cv, conf);
++    MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false);
++    Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
      copyTask.addDependentTask(loadTableTask);
-     rootTasks.add(copyTask);
+     x.getTasks().add(copyTask);
      return loadTableTask;
    }
  
-   @SuppressWarnings("unchecked")
-   private Task<?> createTableTask(CreateTableDesc tableDesc){
-     return TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tableDesc), conf);
+   private static Task<?> createTableTask(CreateTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){
+     return TaskFactory.get(new DDLWork(
+         x.getInputs(),
+         x.getOutputs(),
+         tableDesc
+     ), x.getConf());
    }
  
-   @SuppressWarnings("unchecked")
-   private Task<?> dropTableTask(Table table){
-     return TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
-         new DropTableDesc(table.getTableName(), null, true, true, null)), conf);
+   private static Task<?> dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x){
+     return TaskFactory.get(new DDLWork(
+         x.getInputs(),
+         x.getOutputs(),
+         new DropTableDesc(table.getTableName(), null, true, true, null)
+     ), x.getConf());
    }
  
-   @SuppressWarnings("unchecked")
-   private Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc) {
+   private static Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc,
+       EximUtil.SemanticAnalyzerWrapperContext x) {
      tableDesc.setReplaceMode(true);
-     return TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tableDesc), conf);
+     return TaskFactory.get(new DDLWork(
+         x.getInputs(),
+         x.getOutputs(),
+         tableDesc
+     ), x.getConf());
    }
  
-   private Task<? extends Serializable> alterSinglePartition(
+   private static Task<? extends Serializable> alterSinglePartition(
        URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
        Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
-       ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn) {
+       ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
+       EximUtil.SemanticAnalyzerWrapperContext x) {
      addPartitionDesc.setReplaceMode(true);
      addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location
-     @SuppressWarnings("unchecked")
-     Task<?> r = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc), conf);
-     return r;
+     return TaskFactory.get(new DDLWork(
+         x.getInputs(),
+         x.getOutputs(),
+         addPartitionDesc
+     ), x.getConf());
    }
  
- 
-  private Task<?> addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
-       Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
-       ReplicationSpec replicationSpec, Long mmWriteId, boolean isSourceMm, Task<?> commitTask)
+  private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
 -      Table table, Warehouse wh,
 -      AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x)
++      Table table, Warehouse wh, AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec,
++      EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm,
++      Task<?> commitTask)
        throws MetaException, IOException, HiveException {
      AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
      if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
-       LOG.debug("Importing in-place: adding AddPart for partition "
+       x.getLOG().debug("Importing in-place: adding AddPart for partition "
            + partSpecToString(partSpec.getPartSpec()));
        // addPartitionDesc already has the right partition location
 +      @SuppressWarnings("unchecked")
-       Task<?> addPartTask = TaskFactory.get(new DDLWork(getInputs(),
-           getOutputs(), addPartitionDesc), conf);
+       Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
+           x.getOutputs(), addPartitionDesc), x.getConf());
        return addPartTask;
      } else {
        String srcLocation = partSpec.getLocation();
-       fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec);
+       fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x);
+       x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+           + partSpecToString(partSpec.getPartSpec())
+           + " with source location: " + srcLocation);
        Path tgtLocation = new Path(partSpec.getLocation());
-       Path destPath = mmWriteId == null ? ctx.getExternalTmpPath(tgtLocation)
 -      Path tmpPath = x.getCtx().getExternalTmpPath(tgtLocation);
 -      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
 -          replicationSpec, new Path(srcLocation), tmpPath, x.getConf());
++      Path destPath = mmWriteId == null ? x.getCtx().getExternalTmpPath(tgtLocation)
 +          : new Path(tgtLocation, ValidWriteIds.getMmFilePrefix(mmWriteId));
 +      Path moveTaskSrc =  mmWriteId == null ? destPath : tgtLocation;
 +      Utilities.LOG14535.info("adding import work for partition with source location: "
 +          + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm "
 +          + mmWriteId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec()));
-       CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
-       cw.setSkipSourceMmDirs(isSourceMm);
-       DDLWork dw = new DDLWork(getInputs(), getOutputs(), addPartitionDesc);
++
++
++      Task<?> copyTask = null;
++      if (replicationSpec.isInReplicationScope()) {
++        if (isSourceMm || mmWriteId != null) {
++          // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed.
++          throw new RuntimeException(
++              "Not supported right now because Replication is completely screwed");
++        }
++        copyTask = ReplCopyTask.getLoadCopyTask(
++            replicationSpec, new Path(srcLocation), destPath, x.getConf());
++      } else {
++        CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
++        cw.setSkipSourceMmDirs(isSourceMm);
++        copyTask = TaskFactory.get(cw, x.getConf());
++      }
++
+       Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
+           x.getOutputs(), addPartitionDesc), x.getConf());
 -      LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
 -          Utilities.getTableDesc(table),
 -          partSpec.getPartSpec(), true);
 +      LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
 +          partSpec.getPartSpec(), true, mmWriteId);
        loadTableWork.setInheritTableSpecs(false);
 +      // Do not commit the write ID from each task; need to commit once.
 +      // TODO: we should just change the import to use a single MoveTask, like dynparts.
 +      loadTableWork.setIntermediateInMmWrite(mmWriteId != null);
-       MoveWork mv = new MoveWork(getInputs(), getOutputs(), loadTableWork, null, false);
-       @SuppressWarnings("unchecked")
-       Task<?> copyTask = TaskFactory.get(cw, conf), addPartTask = TaskFactory.get(dw, conf),
-         loadPartTask = TaskFactory.get(mv, conf);
+       Task<?> loadPartTask = TaskFactory.get(new MoveWork(
 -          x.getInputs(), x.getOutputs(), loadTableWork, null, false),
 -          x.getConf());
++          x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf());
        copyTask.addDependentTask(loadPartTask);
        addPartTask.addDependentTask(loadPartTask);
-       rootTasks.add(copyTask);
+       x.getTasks().add(copyTask);
 +      if (commitTask != null) {
 +        loadPartTask.addDependentTask(commitTask);
 +      }
        return addPartTask;
      }
    }
@@@ -702,62 -702,69 +764,71 @@@
  
    /**
     * Create tasks for regular import, no repl complexity
+    * @param tblDesc
+    * @param partitionDescs
+    * @param isPartSpecSet
+    * @param replicationSpec
+    * @param table
+    * @param fromURI
+    * @param fs
+    * @param wh
     */
-   private void createRegularImportTasks(List<Task<? extends Serializable>> rootTasks,
+   private static void createRegularImportTasks(
 -      CreateTableDesc tblDesc,
 -      List<AddPartitionDesc> partitionDescs,
 -      boolean isPartSpecSet,
 -      ReplicationSpec replicationSpec,
 -      Table table, URI fromURI, FileSystem fs, Warehouse wh, EximUtil.SemanticAnalyzerWrapperContext x)
 +      CreateTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet,
 +      ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh,
-       Long mmWriteId, boolean isSourceMm)
++      EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm)
        throws HiveException, URISyntaxException, IOException, MetaException {
  
 -    if (table != null){
 +    if (table != null) {
        if (table.isPartitioned()) {
-         LOG.debug("table partitioned");
-         Task<?> ict = createImportCommitTask(table.getDbName(), table.getTableName(), mmWriteId);
+         x.getLOG().debug("table partitioned");
++        Task<?> ict = createImportCommitTask(
++            table.getDbName(), table.getTableName(), mmWriteId, x.getConf());
  
          for (AddPartitionDesc addPartitionDesc : partitionDescs) {
            Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
            org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
-           if ((ptn = db.getPartition(table, partSpec, false)) == null) {
-             rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
-                 replicationSpec, mmWriteId, isSourceMm, ict));
+           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
+             x.getTasks().add(addSinglePartition(
 -                fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
++                fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
            } else {
              throw new SemanticException(
                  ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
            }
          }
 -
        } else {
-         LOG.debug("table non-partitioned");
+         x.getLOG().debug("table non-partitioned");
          // ensure if destination is not empty only for regular import
          Path tgtPath = new Path(table.getDataLocation().toString());
-         FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf);
-         checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec);
-         loadTable(fromURI, table, false, tgtPath, mmWriteId, isSourceMm);
+         FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
+         checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x);
 -        loadTable(fromURI, table, false, tgtPath, replicationSpec,x);
++        loadTable(fromURI, table, false, tgtPath, replicationSpec, x, mmWriteId, isSourceMm);
        }
        // Set this to read because we can't overwrite any existing partitions
-       outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
+       x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
      } else {
-       LOG.debug("table " + tblDesc.getTableName() + " does not exist");
+       x.getLOG().debug("table " + tblDesc.getTableName() + " does not exist");
  
 +      @SuppressWarnings("unchecked")
-       Task<?> t = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tblDesc), conf);
+       Task<?> t = TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), tblDesc), x.getConf());
        table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
-       Database parentDb = db.getDatabase(tblDesc.getDatabaseName());
+       Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
  
        // Since we are going to be creating a new table in a db, we should mark that db as a write entity
        // so that the auth framework can go to work there.
-       outputs.add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED));
+       x.getOutputs().add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED));
  
        if (isPartitioned(tblDesc)) {
 +        Task<?> ict = createImportCommitTask(
-             tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId);
++            tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
          for (AddPartitionDesc addPartitionDesc : partitionDescs) {
 -          t.addDependentTask(
 -              addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
 +          t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
-               replicationSpec, mmWriteId, isSourceMm, ict));
++            replicationSpec, x, mmWriteId, isSourceMm, ict));
          }
        } else {
-         LOG.debug("adding dependent CopyWork/MoveWork for table");
+         x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
          if (tblDesc.isExternal() && (tblDesc.getLocation() == null)) {
-           LOG.debug("Importing in place, no emptiness check, no copying/loading");
+           x.getLOG().debug("Importing in place, no emptiness check, no copying/loading");
            Path dataPath = new Path(fromURI.toString(), "data");
            tblDesc.setLocation(dataPath.toString());
          } else {
@@@ -767,32 -774,27 +838,35 @@@
            } else {
              tablePath = wh.getTablePath(parentDb, tblDesc.getTableName());
            }
-           FileSystem tgtFs = FileSystem.get(tablePath.toUri(), conf);
-           checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec);
-           t.addDependentTask(loadTable(fromURI, table, false, tablePath, mmWriteId, isSourceMm));
+           FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
+           checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x);
 -          t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x));
++          t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x, mmWriteId, isSourceMm));
          }
        }
-       rootTasks.add(t);
+       x.getTasks().add(t);
      }
    }
  
-   private Task<?> createImportCommitTask(String dbName, String tblName, Long mmWriteId) {
++  private static Task<?> createImportCommitTask(
++      String dbName, String tblName, Long mmWriteId, HiveConf conf) {
 +    @SuppressWarnings("unchecked")
 +    Task<ImportCommitWork> ict = (mmWriteId == null) ? null : TaskFactory.get(
 +        new ImportCommitWork(dbName, tblName, mmWriteId), conf);
 +    return ict;
 +  }
 +
    /**
     * Create tasks for repl import
     */
-   private void createReplImportTasks(List<Task<? extends Serializable>> rootTasks,
-       CreateTableDesc tblDesc, List<AddPartitionDesc> partitionDescs, boolean isPartSpecSet,
-       ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh,
-       Long mmWriteId, boolean isSourceMm)
+   private static void createReplImportTasks(
+       CreateTableDesc tblDesc,
+       List<AddPartitionDesc> partitionDescs,
+       boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnCreateDb,
+       Table table, URI fromURI, FileSystem fs, Warehouse wh,
 -      EximUtil.SemanticAnalyzerWrapperContext x)
++      EximUtil.SemanticAnalyzerWrapperContext x, Long mmWriteId, boolean isSourceMm)
        throws HiveException, URISyntaxException, IOException, MetaException {
  
 -    Task dr = null;
 +    Task<?> dr = null;
      WriteEntity.WriteType lockType = WriteEntity.WriteType.DDL_NO_LOCK;
  
      if ((table != null) && (isPartitioned(tblDesc) != table.isPartitioned())){
@@@ -840,16 -859,13 +931,15 @@@
  
        if (!replicationSpec.isMetadataOnly()) {
          if (isPartitioned(tblDesc)) {
 +          Task<?> ict = createImportCommitTask(
-               tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId);
++              tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
            for (AddPartitionDesc addPartitionDesc : partitionDescs) {
-             t.addDependentTask(addSinglePartition(fromURI, fs, tblDesc, table, wh,
-                 addPartitionDesc, replicationSpec, mmWriteId, isSourceMm, ict));
+             t.addDependentTask(
 -                addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
++                addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
            }
          } else {
-           LOG.debug("adding dependent CopyWork/MoveWork for table");
-           t.addDependentTask(loadTable(
-               fromURI, table, true, new Path(tblDesc.getLocation()), mmWriteId, isSourceMm));
+           x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
 -          t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x));
++          t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()), replicationSpec, x, mmWriteId, isSourceMm));
          }
        }
        if (dr == null){
@@@ -863,27 -879,27 +953,27 @@@
      } else {
        // Table existed, and is okay to replicate into, not dropping and re-creating.
        if (table.isPartitioned()) {
-         LOG.debug("table partitioned");
+         x.getLOG().debug("table partitioned");
          for (AddPartitionDesc addPartitionDesc : partitionDescs) {
 -
            Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
            org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
 -
 +          Task<?> ict = replicationSpec.isMetadataOnly() ? null : createImportCommitTask(
-               tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId);
-           if ((ptn = db.getPartition(table, partSpec, false)) == null) {
++              tblDesc.getDatabaseName(), tblDesc.getTableName(), mmWriteId, x.getConf());
+           if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
              if (!replicationSpec.isMetadataOnly()){
-               rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc,
-                   replicationSpec, mmWriteId, isSourceMm, ict));
+               x.getTasks().add(addSinglePartition(
 -                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
++                  fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
              }
            } else {
              // If replicating, then the partition already existing means we need to replace, maybe, if
              // the destination ptn's repl.last.id is older than the replacement's.
              if (replicationSpec.allowReplacementInto(ptn)){
                if (!replicationSpec.isMetadataOnly()){
-                 rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh,
-                     addPartitionDesc, replicationSpec, mmWriteId, isSourceMm, ict));
+                 x.getTasks().add(addSinglePartition(
 -                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
++                    fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, mmWriteId, isSourceMm, ict));
                } else {
-                 rootTasks.add(alterSinglePartition(fromURI, fs, tblDesc, table, wh,
-                     addPartitionDesc, replicationSpec, ptn));
+                 x.getTasks().add(alterSinglePartition(
+                     fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
                }
                if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
                  lockType = WriteEntity.WriteType.DDL_SHARED;
@@@ -907,10 -923,9 +997,10 @@@
            return; // silently return, table is newer than our replacement.
          }
          if (!replicationSpec.isMetadataOnly()) {
 -          loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into
 +          // repl-imports are replace-into
-           loadTable(fromURI, table, true, new Path(fromURI), mmWriteId, isSourceMm);
++          loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x, mmWriteId, isSourceMm);
          } else {
-           rootTasks.add(alterTableTask(tblDesc));
+           x.getTasks().add(alterTableTask(tblDesc, x));
          }
          if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
            lockType = WriteEntity.WriteType.DDL_SHARED;

http://git-wip-us.apache.org/repos/asf/hive/blob/b3914c37/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------