You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/04/10 09:29:40 UTC

[02/24] hive git commit: HIVE-18839: Implement incremental rebuild for materialized views (only insert operations in source tables) (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php b/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
index 5e3dff1..7a8a42a 100644
--- a/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
+++ b/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
@@ -1514,6 +1514,20 @@ interface ThriftHiveMetastoreIf extends \FacebookServiceIf {
    * @throws \metastore\MetaException
    */
   public function get_serde(\metastore\GetSerdeRequest $rqst);
+  /**
+   * @param string $dbName
+   * @param string $tableName
+   * @param int $txnId
+   * @return \metastore\LockResponse
+   */
+  public function get_lock_materialization_rebuild($dbName, $tableName, $txnId);
+  /**
+   * @param string $dbName
+   * @param string $tableName
+   * @param int $txnId
+   * @return bool
+   */
+  public function heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId);
 }
 
 class ThriftHiveMetastoreClient extends \FacebookServiceClient implements \metastore\ThriftHiveMetastoreIf {
@@ -12887,6 +12901,112 @@ class ThriftHiveMetastoreClient extends \FacebookServiceClient implements \metas
     throw new \Exception("get_serde failed: unknown result");
   }
 
+  public function get_lock_materialization_rebuild($dbName, $tableName, $txnId)
+  {
+    $this->send_get_lock_materialization_rebuild($dbName, $tableName, $txnId);
+    return $this->recv_get_lock_materialization_rebuild();
+  }
+
+  public function send_get_lock_materialization_rebuild($dbName, $tableName, $txnId)
+  {
+    $args = new \metastore\ThriftHiveMetastore_get_lock_materialization_rebuild_args();
+    $args->dbName = $dbName;
+    $args->tableName = $tableName;
+    $args->txnId = $txnId;
+    $bin_accel = ($this->output_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary');
+    if ($bin_accel)
+    {
+      thrift_protocol_write_binary($this->output_, 'get_lock_materialization_rebuild', TMessageType::CALL, $args, $this->seqid_, $this->output_->isStrictWrite());
+    }
+    else
+    {
+      $this->output_->writeMessageBegin('get_lock_materialization_rebuild', TMessageType::CALL, $this->seqid_);
+      $args->write($this->output_);
+      $this->output_->writeMessageEnd();
+      $this->output_->getTransport()->flush();
+    }
+  }
+
+  public function recv_get_lock_materialization_rebuild()
+  {
+    $bin_accel = ($this->input_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_read_binary');
+    if ($bin_accel) $result = thrift_protocol_read_binary($this->input_, '\metastore\ThriftHiveMetastore_get_lock_materialization_rebuild_result', $this->input_->isStrictRead());
+    else
+    {
+      $rseqid = 0;
+      $fname = null;
+      $mtype = 0;
+
+      $this->input_->readMessageBegin($fname, $mtype, $rseqid);
+      if ($mtype == TMessageType::EXCEPTION) {
+        $x = new TApplicationException();
+        $x->read($this->input_);
+        $this->input_->readMessageEnd();
+        throw $x;
+      }
+      $result = new \metastore\ThriftHiveMetastore_get_lock_materialization_rebuild_result();
+      $result->read($this->input_);
+      $this->input_->readMessageEnd();
+    }
+    if ($result->success !== null) {
+      return $result->success;
+    }
+    throw new \Exception("get_lock_materialization_rebuild failed: unknown result");
+  }
+
+  public function heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId)
+  {
+    $this->send_heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId);
+    return $this->recv_heartbeat_lock_materialization_rebuild();
+  }
+
+  public function send_heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId)
+  {
+    $args = new \metastore\ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args();
+    $args->dbName = $dbName;
+    $args->tableName = $tableName;
+    $args->txnId = $txnId;
+    $bin_accel = ($this->output_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary');
+    if ($bin_accel)
+    {
+      thrift_protocol_write_binary($this->output_, 'heartbeat_lock_materialization_rebuild', TMessageType::CALL, $args, $this->seqid_, $this->output_->isStrictWrite());
+    }
+    else
+    {
+      $this->output_->writeMessageBegin('heartbeat_lock_materialization_rebuild', TMessageType::CALL, $this->seqid_);
+      $args->write($this->output_);
+      $this->output_->writeMessageEnd();
+      $this->output_->getTransport()->flush();
+    }
+  }
+
+  public function recv_heartbeat_lock_materialization_rebuild()
+  {
+    $bin_accel = ($this->input_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_read_binary');
+    if ($bin_accel) $result = thrift_protocol_read_binary($this->input_, '\metastore\ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result', $this->input_->isStrictRead());
+    else
+    {
+      $rseqid = 0;
+      $fname = null;
+      $mtype = 0;
+
+      $this->input_->readMessageBegin($fname, $mtype, $rseqid);
+      if ($mtype == TMessageType::EXCEPTION) {
+        $x = new TApplicationException();
+        $x->read($this->input_);
+        $this->input_->readMessageEnd();
+        throw $x;
+      }
+      $result = new \metastore\ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result();
+      $result->read($this->input_);
+      $this->input_->readMessageEnd();
+    }
+    if ($result->success !== null) {
+      return $result->success;
+    }
+    throw new \Exception("heartbeat_lock_materialization_rebuild failed: unknown result");
+  }
+
 }
 
 // HELPER FUNCTIONS AND STRUCTURES
@@ -57981,4 +58101,401 @@ class ThriftHiveMetastore_get_serde_result {
 
 }
 
+class ThriftHiveMetastore_get_lock_materialization_rebuild_args {
+  static $_TSPEC;
+
+  /**
+   * @var string
+   */
+  public $dbName = null;
+  /**
+   * @var string
+   */
+  public $tableName = null;
+  /**
+   * @var int
+   */
+  public $txnId = null;
+
+  public function __construct($vals=null) {
+    if (!isset(self::$_TSPEC)) {
+      self::$_TSPEC = array(
+        1 => array(
+          'var' => 'dbName',
+          'type' => TType::STRING,
+          ),
+        2 => array(
+          'var' => 'tableName',
+          'type' => TType::STRING,
+          ),
+        3 => array(
+          'var' => 'txnId',
+          'type' => TType::I64,
+          ),
+        );
+    }
+    if (is_array($vals)) {
+      if (isset($vals['dbName'])) {
+        $this->dbName = $vals['dbName'];
+      }
+      if (isset($vals['tableName'])) {
+        $this->tableName = $vals['tableName'];
+      }
+      if (isset($vals['txnId'])) {
+        $this->txnId = $vals['txnId'];
+      }
+    }
+  }
+
+  public function getName() {
+    return 'ThriftHiveMetastore_get_lock_materialization_rebuild_args';
+  }
+
+  public function read($input)
+  {
+    $xfer = 0;
+    $fname = null;
+    $ftype = 0;
+    $fid = 0;
+    $xfer += $input->readStructBegin($fname);
+    while (true)
+    {
+      $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+      if ($ftype == TType::STOP) {
+        break;
+      }
+      switch ($fid)
+      {
+        case 1:
+          if ($ftype == TType::STRING) {
+            $xfer += $input->readString($this->dbName);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 2:
+          if ($ftype == TType::STRING) {
+            $xfer += $input->readString($this->tableName);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 3:
+          if ($ftype == TType::I64) {
+            $xfer += $input->readI64($this->txnId);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        default:
+          $xfer += $input->skip($ftype);
+          break;
+      }
+      $xfer += $input->readFieldEnd();
+    }
+    $xfer += $input->readStructEnd();
+    return $xfer;
+  }
+
+  public function write($output) {
+    $xfer = 0;
+    $xfer += $output->writeStructBegin('ThriftHiveMetastore_get_lock_materialization_rebuild_args');
+    if ($this->dbName !== null) {
+      $xfer += $output->writeFieldBegin('dbName', TType::STRING, 1);
+      $xfer += $output->writeString($this->dbName);
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->tableName !== null) {
+      $xfer += $output->writeFieldBegin('tableName', TType::STRING, 2);
+      $xfer += $output->writeString($this->tableName);
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->txnId !== null) {
+      $xfer += $output->writeFieldBegin('txnId', TType::I64, 3);
+      $xfer += $output->writeI64($this->txnId);
+      $xfer += $output->writeFieldEnd();
+    }
+    $xfer += $output->writeFieldStop();
+    $xfer += $output->writeStructEnd();
+    return $xfer;
+  }
+
+}
+
+class ThriftHiveMetastore_get_lock_materialization_rebuild_result {
+  static $_TSPEC;
+
+  /**
+   * @var \metastore\LockResponse
+   */
+  public $success = null;
+
+  public function __construct($vals=null) {
+    if (!isset(self::$_TSPEC)) {
+      self::$_TSPEC = array(
+        0 => array(
+          'var' => 'success',
+          'type' => TType::STRUCT,
+          'class' => '\metastore\LockResponse',
+          ),
+        );
+    }
+    if (is_array($vals)) {
+      if (isset($vals['success'])) {
+        $this->success = $vals['success'];
+      }
+    }
+  }
+
+  public function getName() {
+    return 'ThriftHiveMetastore_get_lock_materialization_rebuild_result';
+  }
+
+  public function read($input)
+  {
+    $xfer = 0;
+    $fname = null;
+    $ftype = 0;
+    $fid = 0;
+    $xfer += $input->readStructBegin($fname);
+    while (true)
+    {
+      $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+      if ($ftype == TType::STOP) {
+        break;
+      }
+      switch ($fid)
+      {
+        case 0:
+          if ($ftype == TType::STRUCT) {
+            $this->success = new \metastore\LockResponse();
+            $xfer += $this->success->read($input);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        default:
+          $xfer += $input->skip($ftype);
+          break;
+      }
+      $xfer += $input->readFieldEnd();
+    }
+    $xfer += $input->readStructEnd();
+    return $xfer;
+  }
+
+  public function write($output) {
+    $xfer = 0;
+    $xfer += $output->writeStructBegin('ThriftHiveMetastore_get_lock_materialization_rebuild_result');
+    if ($this->success !== null) {
+      if (!is_object($this->success)) {
+        throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
+      }
+      $xfer += $output->writeFieldBegin('success', TType::STRUCT, 0);
+      $xfer += $this->success->write($output);
+      $xfer += $output->writeFieldEnd();
+    }
+    $xfer += $output->writeFieldStop();
+    $xfer += $output->writeStructEnd();
+    return $xfer;
+  }
+
+}
+
+class ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args {
+  static $_TSPEC;
+
+  /**
+   * @var string
+   */
+  public $dbName = null;
+  /**
+   * @var string
+   */
+  public $tableName = null;
+  /**
+   * @var int
+   */
+  public $txnId = null;
+
+  public function __construct($vals=null) {
+    if (!isset(self::$_TSPEC)) {
+      self::$_TSPEC = array(
+        1 => array(
+          'var' => 'dbName',
+          'type' => TType::STRING,
+          ),
+        2 => array(
+          'var' => 'tableName',
+          'type' => TType::STRING,
+          ),
+        3 => array(
+          'var' => 'txnId',
+          'type' => TType::I64,
+          ),
+        );
+    }
+    if (is_array($vals)) {
+      if (isset($vals['dbName'])) {
+        $this->dbName = $vals['dbName'];
+      }
+      if (isset($vals['tableName'])) {
+        $this->tableName = $vals['tableName'];
+      }
+      if (isset($vals['txnId'])) {
+        $this->txnId = $vals['txnId'];
+      }
+    }
+  }
+
+  public function getName() {
+    return 'ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args';
+  }
+
+  public function read($input)
+  {
+    $xfer = 0;
+    $fname = null;
+    $ftype = 0;
+    $fid = 0;
+    $xfer += $input->readStructBegin($fname);
+    while (true)
+    {
+      $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+      if ($ftype == TType::STOP) {
+        break;
+      }
+      switch ($fid)
+      {
+        case 1:
+          if ($ftype == TType::STRING) {
+            $xfer += $input->readString($this->dbName);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 2:
+          if ($ftype == TType::STRING) {
+            $xfer += $input->readString($this->tableName);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 3:
+          if ($ftype == TType::I64) {
+            $xfer += $input->readI64($this->txnId);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        default:
+          $xfer += $input->skip($ftype);
+          break;
+      }
+      $xfer += $input->readFieldEnd();
+    }
+    $xfer += $input->readStructEnd();
+    return $xfer;
+  }
+
+  public function write($output) {
+    $xfer = 0;
+    $xfer += $output->writeStructBegin('ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args');
+    if ($this->dbName !== null) {
+      $xfer += $output->writeFieldBegin('dbName', TType::STRING, 1);
+      $xfer += $output->writeString($this->dbName);
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->tableName !== null) {
+      $xfer += $output->writeFieldBegin('tableName', TType::STRING, 2);
+      $xfer += $output->writeString($this->tableName);
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->txnId !== null) {
+      $xfer += $output->writeFieldBegin('txnId', TType::I64, 3);
+      $xfer += $output->writeI64($this->txnId);
+      $xfer += $output->writeFieldEnd();
+    }
+    $xfer += $output->writeFieldStop();
+    $xfer += $output->writeStructEnd();
+    return $xfer;
+  }
+
+}
+
+class ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result {
+  static $_TSPEC;
+
+  /**
+   * @var bool
+   */
+  public $success = null;
+
+  public function __construct($vals=null) {
+    if (!isset(self::$_TSPEC)) {
+      self::$_TSPEC = array(
+        0 => array(
+          'var' => 'success',
+          'type' => TType::BOOL,
+          ),
+        );
+    }
+    if (is_array($vals)) {
+      if (isset($vals['success'])) {
+        $this->success = $vals['success'];
+      }
+    }
+  }
+
+  public function getName() {
+    return 'ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result';
+  }
+
+  public function read($input)
+  {
+    $xfer = 0;
+    $fname = null;
+    $ftype = 0;
+    $fid = 0;
+    $xfer += $input->readStructBegin($fname);
+    while (true)
+    {
+      $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+      if ($ftype == TType::STOP) {
+        break;
+      }
+      switch ($fid)
+      {
+        case 0:
+          if ($ftype == TType::BOOL) {
+            $xfer += $input->readBool($this->success);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        default:
+          $xfer += $input->skip($ftype);
+          break;
+      }
+      $xfer += $input->readFieldEnd();
+    }
+    $xfer += $input->readStructEnd();
+    return $xfer;
+  }
+
+  public function write($output) {
+    $xfer = 0;
+    $xfer += $output->writeStructBegin('ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result');
+    if ($this->success !== null) {
+      $xfer += $output->writeFieldBegin('success', TType::BOOL, 0);
+      $xfer += $output->writeBool($this->success);
+      $xfer += $output->writeFieldEnd();
+    }
+    $xfer += $output->writeFieldStop();
+    $xfer += $output->writeStructEnd();
+    return $xfer;
+  }
+
+}
+
 

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php
index d4fcc88..14416b4 100644
--- a/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php
+++ b/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php
@@ -23975,6 +23975,10 @@ class Materialization {
    * @var int
    */
   public $invalidationTime = null;
+  /**
+   * @var bool
+   */
+  public $sourceTablesUpdateDeleteModified = null;
 
   public function __construct($vals=null) {
     if (!isset(self::$_TSPEC)) {
@@ -23995,6 +23999,10 @@ class Materialization {
           'var' => 'invalidationTime',
           'type' => TType::I64,
           ),
+        4 => array(
+          'var' => 'sourceTablesUpdateDeleteModified',
+          'type' => TType::BOOL,
+          ),
         );
     }
     if (is_array($vals)) {
@@ -24007,6 +24015,9 @@ class Materialization {
       if (isset($vals['invalidationTime'])) {
         $this->invalidationTime = $vals['invalidationTime'];
       }
+      if (isset($vals['sourceTablesUpdateDeleteModified'])) {
+        $this->sourceTablesUpdateDeleteModified = $vals['sourceTablesUpdateDeleteModified'];
+      }
     }
   }
 
@@ -24064,6 +24075,13 @@ class Materialization {
             $xfer += $input->skip($ftype);
           }
           break;
+        case 4:
+          if ($ftype == TType::BOOL) {
+            $xfer += $input->readBool($this->sourceTablesUpdateDeleteModified);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
         default:
           $xfer += $input->skip($ftype);
           break;
@@ -24108,6 +24126,11 @@ class Materialization {
       $xfer += $output->writeI64($this->invalidationTime);
       $xfer += $output->writeFieldEnd();
     }
+    if ($this->sourceTablesUpdateDeleteModified !== null) {
+      $xfer += $output->writeFieldBegin('sourceTablesUpdateDeleteModified', TType::BOOL, 4);
+      $xfer += $output->writeBool($this->sourceTablesUpdateDeleteModified);
+      $xfer += $output->writeFieldEnd();
+    }
     $xfer += $output->writeFieldStop();
     $xfer += $output->writeStructEnd();
     return $xfer;

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
index d39690f..079c7fc 100755
--- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
+++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
@@ -224,6 +224,8 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
   print('  void set_schema_version_state(SetSchemaVersionStateRequest rqst)')
   print('  void add_serde(SerDeInfo serde)')
   print('  SerDeInfo get_serde(GetSerdeRequest rqst)')
+  print('  LockResponse get_lock_materialization_rebuild(string dbName, string tableName, i64 txnId)')
+  print('  bool heartbeat_lock_materialization_rebuild(string dbName, string tableName, i64 txnId)')
   print('  string getName()')
   print('  string getVersion()')
   print('  fb_status getStatus()')
@@ -1493,6 +1495,18 @@ elif cmd == 'get_serde':
     sys.exit(1)
   pp.pprint(client.get_serde(eval(args[0]),))
 
+elif cmd == 'get_lock_materialization_rebuild':
+  if len(args) != 3:
+    print('get_lock_materialization_rebuild requires 3 args')
+    sys.exit(1)
+  pp.pprint(client.get_lock_materialization_rebuild(args[0],args[1],eval(args[2]),))
+
+elif cmd == 'heartbeat_lock_materialization_rebuild':
+  if len(args) != 3:
+    print('heartbeat_lock_materialization_rebuild requires 3 args')
+    sys.exit(1)
+  pp.pprint(client.heartbeat_lock_materialization_rebuild(args[0],args[1],eval(args[2]),))
+
 elif cmd == 'getName':
   if len(args) != 0:
     print('getName requires 0 args')

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
index f8ffeac..b0e64d8 100644
--- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
+++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
@@ -1548,6 +1548,24 @@ class Iface(fb303.FacebookService.Iface):
     """
     pass
 
+  def get_lock_materialization_rebuild(self, dbName, tableName, txnId):
+    """
+    Parameters:
+     - dbName
+     - tableName
+     - txnId
+    """
+    pass
+
+  def heartbeat_lock_materialization_rebuild(self, dbName, tableName, txnId):
+    """
+    Parameters:
+     - dbName
+     - tableName
+     - txnId
+    """
+    pass
+
 
 class Client(fb303.FacebookService.Client, Iface):
   """
@@ -8711,6 +8729,76 @@ class Client(fb303.FacebookService.Client, Iface):
       raise result.o2
     raise TApplicationException(TApplicationException.MISSING_RESULT, "get_serde failed: unknown result")
 
+  def get_lock_materialization_rebuild(self, dbName, tableName, txnId):
+    """
+    Parameters:
+     - dbName
+     - tableName
+     - txnId
+    """
+    self.send_get_lock_materialization_rebuild(dbName, tableName, txnId)
+    return self.recv_get_lock_materialization_rebuild()
+
+  def send_get_lock_materialization_rebuild(self, dbName, tableName, txnId):
+    self._oprot.writeMessageBegin('get_lock_materialization_rebuild', TMessageType.CALL, self._seqid)
+    args = get_lock_materialization_rebuild_args()
+    args.dbName = dbName
+    args.tableName = tableName
+    args.txnId = txnId
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_get_lock_materialization_rebuild(self):
+    iprot = self._iprot
+    (fname, mtype, rseqid) = iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(iprot)
+      iprot.readMessageEnd()
+      raise x
+    result = get_lock_materialization_rebuild_result()
+    result.read(iprot)
+    iprot.readMessageEnd()
+    if result.success is not None:
+      return result.success
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "get_lock_materialization_rebuild failed: unknown result")
+
+  def heartbeat_lock_materialization_rebuild(self, dbName, tableName, txnId):
+    """
+    Parameters:
+     - dbName
+     - tableName
+     - txnId
+    """
+    self.send_heartbeat_lock_materialization_rebuild(dbName, tableName, txnId)
+    return self.recv_heartbeat_lock_materialization_rebuild()
+
+  def send_heartbeat_lock_materialization_rebuild(self, dbName, tableName, txnId):
+    self._oprot.writeMessageBegin('heartbeat_lock_materialization_rebuild', TMessageType.CALL, self._seqid)
+    args = heartbeat_lock_materialization_rebuild_args()
+    args.dbName = dbName
+    args.tableName = tableName
+    args.txnId = txnId
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_heartbeat_lock_materialization_rebuild(self):
+    iprot = self._iprot
+    (fname, mtype, rseqid) = iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(iprot)
+      iprot.readMessageEnd()
+      raise x
+    result = heartbeat_lock_materialization_rebuild_result()
+    result.read(iprot)
+    iprot.readMessageEnd()
+    if result.success is not None:
+      return result.success
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "heartbeat_lock_materialization_rebuild failed: unknown result")
+
 
 class Processor(fb303.FacebookService.Processor, Iface, TProcessor):
   def __init__(self, handler):
@@ -8915,6 +9003,8 @@ class Processor(fb303.FacebookService.Processor, Iface, TProcessor):
     self._processMap["set_schema_version_state"] = Processor.process_set_schema_version_state
     self._processMap["add_serde"] = Processor.process_add_serde
     self._processMap["get_serde"] = Processor.process_get_serde
+    self._processMap["get_lock_materialization_rebuild"] = Processor.process_get_lock_materialization_rebuild
+    self._processMap["heartbeat_lock_materialization_rebuild"] = Processor.process_heartbeat_lock_materialization_rebuild
 
   def process(self, iprot, oprot):
     (name, type, seqid) = iprot.readMessageBegin()
@@ -13889,6 +13979,44 @@ class Processor(fb303.FacebookService.Processor, Iface, TProcessor):
     oprot.writeMessageEnd()
     oprot.trans.flush()
 
+  def process_get_lock_materialization_rebuild(self, seqid, iprot, oprot):
+    args = get_lock_materialization_rebuild_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = get_lock_materialization_rebuild_result()
+    try:
+      result.success = self._handler.get_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId)
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("get_lock_materialization_rebuild", msg_type, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
+  def process_heartbeat_lock_materialization_rebuild(self, seqid, iprot, oprot):
+    args = heartbeat_lock_materialization_rebuild_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = heartbeat_lock_materialization_rebuild_result()
+    try:
+      result.success = self._handler.heartbeat_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId)
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("heartbeat_lock_materialization_rebuild", msg_type, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
 
 # HELPER FUNCTIONS AND STRUCTURES
 
@@ -47281,3 +47409,314 @@ class get_serde_result:
 
   def __ne__(self, other):
     return not (self == other)
+
+class get_lock_materialization_rebuild_args:
+  """
+  Attributes:
+   - dbName
+   - tableName
+   - txnId
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'dbName', None, None, ), # 1
+    (2, TType.STRING, 'tableName', None, None, ), # 2
+    (3, TType.I64, 'txnId', None, None, ), # 3
+  )
+
+  def __init__(self, dbName=None, tableName=None, txnId=None,):
+    self.dbName = dbName
+    self.tableName = tableName
+    self.txnId = txnId
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.dbName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.tableName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I64:
+          self.txnId = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('get_lock_materialization_rebuild_args')
+    if self.dbName is not None:
+      oprot.writeFieldBegin('dbName', TType.STRING, 1)
+      oprot.writeString(self.dbName)
+      oprot.writeFieldEnd()
+    if self.tableName is not None:
+      oprot.writeFieldBegin('tableName', TType.STRING, 2)
+      oprot.writeString(self.tableName)
+      oprot.writeFieldEnd()
+    if self.txnId is not None:
+      oprot.writeFieldBegin('txnId', TType.I64, 3)
+      oprot.writeI64(self.txnId)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.dbName)
+    value = (value * 31) ^ hash(self.tableName)
+    value = (value * 31) ^ hash(self.txnId)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class get_lock_materialization_rebuild_result:
+  """
+  Attributes:
+   - success
+  """
+
+  thrift_spec = (
+    (0, TType.STRUCT, 'success', (LockResponse, LockResponse.thrift_spec), None, ), # 0
+  )
+
+  def __init__(self, success=None,):
+    self.success = success
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 0:
+        if ftype == TType.STRUCT:
+          self.success = LockResponse()
+          self.success.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('get_lock_materialization_rebuild_result')
+    if self.success is not None:
+      oprot.writeFieldBegin('success', TType.STRUCT, 0)
+      self.success.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.success)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class heartbeat_lock_materialization_rebuild_args:
+  """
+  Attributes:
+   - dbName
+   - tableName
+   - txnId
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'dbName', None, None, ), # 1
+    (2, TType.STRING, 'tableName', None, None, ), # 2
+    (3, TType.I64, 'txnId', None, None, ), # 3
+  )
+
+  def __init__(self, dbName=None, tableName=None, txnId=None,):
+    self.dbName = dbName
+    self.tableName = tableName
+    self.txnId = txnId
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.dbName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.tableName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I64:
+          self.txnId = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('heartbeat_lock_materialization_rebuild_args')
+    if self.dbName is not None:
+      oprot.writeFieldBegin('dbName', TType.STRING, 1)
+      oprot.writeString(self.dbName)
+      oprot.writeFieldEnd()
+    if self.tableName is not None:
+      oprot.writeFieldBegin('tableName', TType.STRING, 2)
+      oprot.writeString(self.tableName)
+      oprot.writeFieldEnd()
+    if self.txnId is not None:
+      oprot.writeFieldBegin('txnId', TType.I64, 3)
+      oprot.writeI64(self.txnId)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.dbName)
+    value = (value * 31) ^ hash(self.tableName)
+    value = (value * 31) ^ hash(self.txnId)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class heartbeat_lock_materialization_rebuild_result:
+  """
+  Attributes:
+   - success
+  """
+
+  thrift_spec = (
+    (0, TType.BOOL, 'success', None, None, ), # 0
+  )
+
+  def __init__(self, success=None,):
+    self.success = success
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 0:
+        if ftype == TType.BOOL:
+          self.success = iprot.readBool()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('heartbeat_lock_materialization_rebuild_result')
+    if self.success is not None:
+      oprot.writeFieldBegin('success', TType.BOOL, 0)
+      oprot.writeBool(self.success)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.success)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 972db1f..f2f61e0 100644
--- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -16941,6 +16941,7 @@ class Materialization:
    - tablesUsed
    - validTxnList
    - invalidationTime
+   - sourceTablesUpdateDeleteModified
   """
 
   thrift_spec = (
@@ -16948,12 +16949,14 @@ class Materialization:
     (1, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 1
     (2, TType.STRING, 'validTxnList', None, None, ), # 2
     (3, TType.I64, 'invalidationTime', None, None, ), # 3
+    (4, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 4
   )
 
-  def __init__(self, tablesUsed=None, validTxnList=None, invalidationTime=None,):
+  def __init__(self, tablesUsed=None, validTxnList=None, invalidationTime=None, sourceTablesUpdateDeleteModified=None,):
     self.tablesUsed = tablesUsed
     self.validTxnList = validTxnList
     self.invalidationTime = invalidationTime
+    self.sourceTablesUpdateDeleteModified = sourceTablesUpdateDeleteModified
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -16984,6 +16987,11 @@ class Materialization:
           self.invalidationTime = iprot.readI64()
         else:
           iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.BOOL:
+          self.sourceTablesUpdateDeleteModified = iprot.readBool()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -17009,14 +17017,16 @@ class Materialization:
       oprot.writeFieldBegin('invalidationTime', TType.I64, 3)
       oprot.writeI64(self.invalidationTime)
       oprot.writeFieldEnd()
+    if self.sourceTablesUpdateDeleteModified is not None:
+      oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 4)
+      oprot.writeBool(self.sourceTablesUpdateDeleteModified)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
   def validate(self):
     if self.tablesUsed is None:
       raise TProtocol.TProtocolException(message='Required field tablesUsed is unset!')
-    if self.invalidationTime is None:
-      raise TProtocol.TProtocolException(message='Required field invalidationTime is unset!')
     return
 
 
@@ -17025,6 +17035,7 @@ class Materialization:
     value = (value * 31) ^ hash(self.tablesUsed)
     value = (value * 31) ^ hash(self.validTxnList)
     value = (value * 31) ^ hash(self.invalidationTime)
+    value = (value * 31) ^ hash(self.sourceTablesUpdateDeleteModified)
     return value
 
   def __repr__(self):

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 94454a1..0e70e89 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -3814,18 +3814,19 @@ class Materialization
   TABLESUSED = 1
   VALIDTXNLIST = 2
   INVALIDATIONTIME = 3
+  SOURCETABLESUPDATEDELETEMODIFIED = 4
 
   FIELDS = {
     TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}},
     VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true},
-    INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 'invalidationTime'}
+    INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 'invalidationTime', :optional => true},
+    SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name => 'sourceTablesUpdateDeleteModified', :optional => true}
   }
 
   def struct_fields; FIELDS; end
 
   def validate
     raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tablesUsed is unset!') unless @tablesUsed
-    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field invalidationTime is unset!') unless @invalidationTime
   end
 
   ::Thrift::Struct.generate_accessors self

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index c103675..58ebd29 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -3348,6 +3348,36 @@ module ThriftHiveMetastore
       raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_serde failed: unknown result')
     end
 
+    def get_lock_materialization_rebuild(dbName, tableName, txnId)
+      send_get_lock_materialization_rebuild(dbName, tableName, txnId)
+      return recv_get_lock_materialization_rebuild()
+    end
+
+    def send_get_lock_materialization_rebuild(dbName, tableName, txnId)
+      send_message('get_lock_materialization_rebuild', Get_lock_materialization_rebuild_args, :dbName => dbName, :tableName => tableName, :txnId => txnId)
+    end
+
+    def recv_get_lock_materialization_rebuild()
+      result = receive_message(Get_lock_materialization_rebuild_result)
+      return result.success unless result.success.nil?
+      raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_lock_materialization_rebuild failed: unknown result')
+    end
+
+    def heartbeat_lock_materialization_rebuild(dbName, tableName, txnId)
+      send_heartbeat_lock_materialization_rebuild(dbName, tableName, txnId)
+      return recv_heartbeat_lock_materialization_rebuild()
+    end
+
+    def send_heartbeat_lock_materialization_rebuild(dbName, tableName, txnId)
+      send_message('heartbeat_lock_materialization_rebuild', Heartbeat_lock_materialization_rebuild_args, :dbName => dbName, :tableName => tableName, :txnId => txnId)
+    end
+
+    def recv_heartbeat_lock_materialization_rebuild()
+      result = receive_message(Heartbeat_lock_materialization_rebuild_result)
+      return result.success unless result.success.nil?
+      raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'heartbeat_lock_materialization_rebuild failed: unknown result')
+    end
+
   end
 
   class Processor < ::FacebookService::Processor 
@@ -5875,6 +5905,20 @@ module ThriftHiveMetastore
       write_result(result, oprot, 'get_serde', seqid)
     end
 
+    def process_get_lock_materialization_rebuild(seqid, iprot, oprot)
+      args = read_args(iprot, Get_lock_materialization_rebuild_args)
+      result = Get_lock_materialization_rebuild_result.new()
+      result.success = @handler.get_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId)
+      write_result(result, oprot, 'get_lock_materialization_rebuild', seqid)
+    end
+
+    def process_heartbeat_lock_materialization_rebuild(seqid, iprot, oprot)
+      args = read_args(iprot, Heartbeat_lock_materialization_rebuild_args)
+      result = Heartbeat_lock_materialization_rebuild_result.new()
+      result.success = @handler.heartbeat_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId)
+      write_result(result, oprot, 'heartbeat_lock_materialization_rebuild', seqid)
+    end
+
   end
 
   # HELPER FUNCTIONS AND STRUCTURES
@@ -13301,5 +13345,77 @@ module ThriftHiveMetastore
     ::Thrift::Struct.generate_accessors self
   end
 
+  class Get_lock_materialization_rebuild_args
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    DBNAME = 1
+    TABLENAME = 2
+    TXNID = 3
+
+    FIELDS = {
+      DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
+      TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
+      TXNID => {:type => ::Thrift::Types::I64, :name => 'txnId'}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class Get_lock_materialization_rebuild_result
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    SUCCESS = 0
+
+    FIELDS = {
+      SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::LockResponse}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class Heartbeat_lock_materialization_rebuild_args
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    DBNAME = 1
+    TABLENAME = 2
+    TXNID = 3
+
+    FIELDS = {
+      DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
+      TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
+      TXNID => {:type => ::Thrift::Types::I64, :name => 'txnId'}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class Heartbeat_lock_materialization_rebuild_result
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    SUCCESS = 0
+
+    FIELDS = {
+      SUCCESS => {:type => ::Thrift::Types::BOOL, :name => 'success'}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
 end
 

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index c81b8fa..30922ba 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -8367,6 +8367,18 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         endFunction("get_serde", serde != null, ex);
       }
     }
+
+    @Override
+    public LockResponse get_lock_materialization_rebuild(String dbName, String tableName, long txnId)
+        throws TException {
+      return MaterializationsRebuildLockHandler.get().lockResource(dbName, tableName, txnId);
+    }
+
+    @Override
+    public boolean heartbeat_lock_materialization_rebuild(String dbName, String tableName, long txnId)
+        throws TException {
+      return MaterializationsRebuildLockHandler.get().refreshLockResource(dbName, tableName, txnId);
+    }
   }
 
   private static IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, Configuration conf)

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index ebbf465..95a3767 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -3127,4 +3127,14 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     else if (max <= Short.MAX_VALUE) return (short)max;
     else return Short.MAX_VALUE;
   }
+
+  @Override
+  public LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException {
+    return client.get_lock_materialization_rebuild(dbName, tableName, txnId);
+  }
+
+  @Override
+  public boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException {
+    return client.heartbeat_lock_materialization_rebuild(dbName, tableName, txnId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index b2c40c2..98674cf 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -3613,4 +3613,25 @@ public interface IMetaStoreClient {
    * @throws TException general thrift error
    */
   SerDeInfo getSerDe(String serDeName) throws TException;
+
+  /**
+   * Acquire the materialization rebuild lock for a given view. We need to specify the fully
+   * qualified name of the materialized view and the open transaction ID so we can identify
+   * uniquely the lock.
+   * @param dbName db name for the materialized view
+   * @param tableName table name for the materialized view
+   * @param txnId transaction id for the rebuild
+   * @return the response from the metastore, where the lock id is equal to the txn id and
+   * the status can be either ACQUIRED or NOT ACQUIRED
+   */
+  LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException;
+
+  /**
+   * Method to refresh the acquisition of a given materialization rebuild lock.
+   * @param dbName db name for the materialized view
+   * @param tableName table name for the materialized view
+   * @param txnId transaction id for the rebuild
+   * @return true if the lock could be renewed, false otherwise
+   */
+  boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java
deleted file mode 100644
index 3d77407..0000000
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.hive.metastore.api.Materialization;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-/**
- * Contains information about the invalidation of a materialization,
- * including the materialization name, the tables that it uses, and
- * the invalidation time, i.e., the first moment t0 after the
- * materialization was created at which one of the tables that it uses
- * was modified.
- */
-@SuppressWarnings("serial")
-public class MaterializationInvalidationInfo extends Materialization {
-
-  private AtomicLong invalidationTime;
-
-  public MaterializationInvalidationInfo(Set<String> tablesUsed, String validTxnList) {
-    super(tablesUsed, 0);
-    this.setValidTxnList(validTxnList);
-    this.invalidationTime = new AtomicLong(0);
-  }
-
-  public boolean compareAndSetInvalidationTime(long expect, long update) {
-    boolean success = invalidationTime.compareAndSet(expect, update);
-    if (success) {
-      super.setInvalidationTime(update);
-    }
-    return success;
-  }
-
-  public long getInvalidationTime() {
-    return invalidationTime.get();
-  }
-
-  public void setInvalidationTime(long invalidationTime) {
-    throw new UnsupportedOperationException("You should call compareAndSetInvalidationTime instead");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
index 80cb1de..99c5abc 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
@@ -26,15 +26,17 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.Materialization;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -65,10 +67,12 @@ public final class MaterializationsInvalidationCache {
 
   /* Key is the database name. Each value is a map from the unique view qualified name to
    * the materialization invalidation info. This invalidation object contains information
-   * such as the tables used by the materialized view or the invalidation time, i.e., first
-   * modification of the tables used by materialized view after the view was created. */
-  private final ConcurrentMap<String, ConcurrentMap<String, MaterializationInvalidationInfo>> materializations =
-      new ConcurrentHashMap<String, ConcurrentMap<String, MaterializationInvalidationInfo>>();
+   * such as the tables used by the materialized view, whether there was any update or
+   * delete in the source tables since the materialized view was created or rebuilt,
+   * or the invalidation time, i.e., first modification of the tables used by materialized
+   * view after the view was created. */
+  private final ConcurrentMap<String, ConcurrentMap<String, Materialization>> materializations =
+      new ConcurrentHashMap<>();
 
   /*
    * Key is a qualified table name. The value is a (sorted) tree map (supporting concurrent
@@ -77,7 +81,10 @@ public final class MaterializationsInvalidationCache {
    * materialization.
    */
   private final ConcurrentMap<String, ConcurrentSkipListMap<Long, Long>> tableModifications =
-      new ConcurrentHashMap<String, ConcurrentSkipListMap<Long, Long>>();
+      new ConcurrentHashMap<>();
+
+  private final ConcurrentMap<String, ConcurrentSkipListSet<Long>> updateDeleteTableModifications =
+      new ConcurrentHashMap<>();
 
   /* Whether the cache has been initialized or not. */
   private boolean initialized;
@@ -188,9 +195,9 @@ public final class MaterializationsInvalidationCache {
       return;
     }
     // We are going to create the map for each view in the given database
-    ConcurrentMap<String, MaterializationInvalidationInfo> cq =
-        new ConcurrentHashMap<String, MaterializationInvalidationInfo>();
-    final ConcurrentMap<String, MaterializationInvalidationInfo> prevCq = materializations.putIfAbsent(
+    ConcurrentMap<String, Materialization> cq =
+        new ConcurrentHashMap<String, Materialization>();
+    final ConcurrentMap<String, Materialization> prevCq = materializations.putIfAbsent(
         dbName, cq);
     if (prevCq != null) {
       cq = prevCq;
@@ -204,13 +211,15 @@ public final class MaterializationsInvalidationCache {
     }
     if (opType == OpType.CREATE || opType == OpType.ALTER) {
       // You store the materialized view
-      cq.put(tableName, new MaterializationInvalidationInfo(tablesUsed, validTxnList));
+      Materialization materialization = new Materialization(tablesUsed);
+      materialization.setValidTxnList(validTxnList);
+      cq.put(tableName, materialization);
     } else {
-      ValidTxnList txnList = new ValidReadTxnList(validTxnList);
+      ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(validTxnList);
       for (String qNameTableUsed : tablesUsed) {
+        ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed);
         // First we insert a new tree set to keep table modifications, unless it already exists
-        ConcurrentSkipListMap<Long, Long> modificationsTree =
-                new ConcurrentSkipListMap<Long, Long>();
+        ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>();
         final ConcurrentSkipListMap<Long, Long> prevModificationsTree = tableModifications.putIfAbsent(
                 qNameTableUsed, modificationsTree);
         if (prevModificationsTree != null) {
@@ -222,7 +231,7 @@ public final class MaterializationsInvalidationCache {
         try {
           String[] names =  qNameTableUsed.split("\\.");
           BasicTxnInfo e = handler.getTxnHandler().getFirstCompletedTransactionForTableAfterCommit(
-                  names[0], names[1], txnList);
+                  names[0], names[1], tableTxnList);
           if (!e.isIsnull()) {
             modificationsTree.put(e.getTxnid(), e.getTime());
             // We do not need to do anything more for current table, as we detected
@@ -236,7 +245,9 @@ public final class MaterializationsInvalidationCache {
         }
       }
       // For LOAD, you only add it if it does exist as you might be loading an outdated MV
-      cq.putIfAbsent(tableName, new MaterializationInvalidationInfo(tablesUsed, validTxnList));
+      Materialization materialization = new Materialization(tablesUsed);
+      materialization.setValidTxnList(validTxnList);
+      cq.putIfAbsent(tableName, materialization);
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("Cached materialized view for rewriting in invalidation cache: " +
@@ -249,7 +260,7 @@ public final class MaterializationsInvalidationCache {
    * invalidation for the MVs that use that table.
    */
   public void notifyTableModification(String dbName, String tableName,
-      long txnId, long newModificationTime) {
+      long txnId, long newModificationTime, boolean isUpdateDelete) {
     if (disable) {
       // Nothing to do
       return;
@@ -258,8 +269,18 @@ public final class MaterializationsInvalidationCache {
       LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}",
           tableName, dbName, txnId, newModificationTime);
     }
-    ConcurrentSkipListMap<Long, Long> modificationsTree =
-        new ConcurrentSkipListMap<Long, Long>();
+    if (isUpdateDelete) {
+      // We update first the update/delete modifications record
+      ConcurrentSkipListSet<Long> modificationsSet = new ConcurrentSkipListSet<>();
+      final ConcurrentSkipListSet<Long> prevModificationsSet =
+          updateDeleteTableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName),
+              modificationsSet);
+      if (prevModificationsSet != null) {
+        modificationsSet = prevModificationsSet;
+      }
+      modificationsSet.add(txnId);
+    }
+    ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>();
     final ConcurrentSkipListMap<Long, Long> prevModificationsTree =
         tableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), modificationsTree);
     if (prevModificationsTree != null) {
@@ -293,30 +314,21 @@ public final class MaterializationsInvalidationCache {
     if (materializations.get(dbName) != null) {
       ImmutableMap.Builder<String, Materialization> m = ImmutableMap.builder();
       for (String materializationName : materializationNames) {
-        MaterializationInvalidationInfo materialization =
+        Materialization materialization =
             materializations.get(dbName).get(materializationName);
         if (materialization == null) {
           LOG.debug("Materialization {} skipped as there is no information "
               + "in the invalidation cache about it", materializationName);
           continue;
         }
-        long invalidationTime = getInvalidationTime(materialization);
-        // We need to check whether previous value is zero, as data modification
-        // in another table used by the materialized view might have modified
-        // the value too
-        boolean modified = materialization.compareAndSetInvalidationTime(0L, invalidationTime);
-        while (!modified) {
-          long currentInvalidationTime = materialization.getInvalidationTime();
-          if (invalidationTime < currentInvalidationTime) {
-            // It was set by other table modification, but it was after this table modification
-            // hence we need to set it
-            modified = materialization.compareAndSetInvalidationTime(currentInvalidationTime, invalidationTime);
-          } else {
-            // Nothing to do
-            modified = true;
-          }
-        }
-        m.put(materializationName, materialization);
+        // We create a deep copy of the materialization, as we need to set the time
+        // and whether any update/delete operation happen on the tables that it uses
+        // since it was created.
+        Materialization materializationCopy = new Materialization(
+            materialization.getTablesUsed());
+        materializationCopy.setValidTxnList(materialization.getValidTxnList());
+        enrichWithInvalidationInfo(materializationCopy);
+        m.put(materializationName, materializationCopy);
       }
       Map<String, Materialization> result = m.build();
       if (LOG.isDebugEnabled()) {
@@ -327,50 +339,65 @@ public final class MaterializationsInvalidationCache {
     return ImmutableMap.of();
   }
 
-  private long getInvalidationTime(MaterializationInvalidationInfo materialization) {
-    String txnListString = materialization.getValidTxnList();
-    if (txnListString == null) {
+  private void enrichWithInvalidationInfo(Materialization materialization) {
+    String materializationTxnListString = materialization.getValidTxnList();
+    if (materializationTxnListString == null) {
       // This can happen when the materialization was created on non-transactional tables
-      return Long.MIN_VALUE;
+      materialization.setInvalidationTime(Long.MIN_VALUE);
+      return;
     }
 
     // We will obtain the modification time as follows.
     // First, we obtain the first element after high watermark (if any)
     // Then, we iterate through the elements from min open txn till high
     // watermark, updating the modification time after creation if needed
-    ValidTxnList txnList = new ValidReadTxnList(txnListString);
+    ValidTxnWriteIdList materializationTxnList = new ValidTxnWriteIdList(materializationTxnListString);
     long firstModificationTimeAfterCreation = 0L;
+    boolean containsUpdateDelete = false;
     for (String qNameTableUsed : materialization.getTablesUsed()) {
-      final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed)
-          .higherEntry(txnList.getHighWatermark());
+      final ValidWriteIdList tableMaterializationTxnList =
+          materializationTxnList.getTableValidWriteIdList(qNameTableUsed);
+
+      final ConcurrentSkipListMap<Long, Long> usedTableModifications =
+          tableModifications.get(qNameTableUsed);
+      final ConcurrentSkipListSet<Long> usedUDTableModifications =
+          updateDeleteTableModifications.get(qNameTableUsed);
+      final Entry<Long, Long> tn = usedTableModifications.higherEntry(tableMaterializationTxnList.getHighWatermark());
       if (tn != null) {
         if (firstModificationTimeAfterCreation == 0L ||
             tn.getValue() < firstModificationTimeAfterCreation) {
           firstModificationTimeAfterCreation = tn.getValue();
         }
+        // Check if there was any update/delete after creation
+        containsUpdateDelete = usedUDTableModifications != null &&
+            !usedUDTableModifications.tailSet(tableMaterializationTxnList.getHighWatermark(), false).isEmpty();
       }
       // Min open txn might be null if there were no open transactions
       // when this transaction was being executed
-      if (txnList.getMinOpenTxn() != null) {
+      if (tableMaterializationTxnList.getMinOpenWriteId() != null) {
         // Invalid transaction list is sorted
         int pos = 0;
-        for (Map.Entry<Long, Long> t : tableModifications.get(qNameTableUsed)
-                .subMap(txnList.getMinOpenTxn(), txnList.getHighWatermark()).entrySet()) {
-          while (pos < txnList.getInvalidTransactions().length &&
-              txnList.getInvalidTransactions()[pos] != t.getKey()) {
+        for (Map.Entry<Long, Long> t : usedTableModifications
+                .subMap(tableMaterializationTxnList.getMinOpenWriteId(), tableMaterializationTxnList.getHighWatermark()).entrySet()) {
+          while (pos < tableMaterializationTxnList.getInvalidWriteIds().length &&
+              tableMaterializationTxnList.getInvalidWriteIds()[pos] != t.getKey()) {
             pos++;
           }
-          if (pos >= txnList.getInvalidTransactions().length) {
+          if (pos >= tableMaterializationTxnList.getInvalidWriteIds().length) {
             break;
           }
           if (firstModificationTimeAfterCreation == 0L ||
               t.getValue() < firstModificationTimeAfterCreation) {
             firstModificationTimeAfterCreation = t.getValue();
           }
+          containsUpdateDelete = containsUpdateDelete ||
+              (usedUDTableModifications != null && usedUDTableModifications.contains(t.getKey()));
         }
       }
     }
-    return firstModificationTimeAfterCreation;
+
+    materialization.setInvalidationTime(firstModificationTimeAfterCreation);
+    materialization.setSourceTablesUpdateDeleteModified(containsUpdateDelete);
   }
 
   private enum OpType {
@@ -395,16 +422,17 @@ public final class MaterializationsInvalidationCache {
     // We execute the cleanup in two steps
     // First we gather all the transactions that need to be kept
     final Multimap<String, Long> keepTxnInfos = HashMultimap.create();
-    for (Map.Entry<String, ConcurrentMap<String, MaterializationInvalidationInfo>> e : materializations.entrySet()) {
-      for (MaterializationInvalidationInfo m : e.getValue().values()) {
-        ValidTxnList txnList = new ValidReadTxnList(m.getValidTxnList());
+    for (Map.Entry<String, ConcurrentMap<String, Materialization>> e : materializations.entrySet()) {
+      for (Materialization m : e.getValue().values()) {
+        ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(m.getValidTxnList());
         boolean canBeDeleted = false;
         String currentTableForInvalidatingTxn = null;
         long currentInvalidatingTxnId = 0L;
         long currentInvalidatingTxnTime = 0L;
         for (String qNameTableUsed : m.getTablesUsed()) {
+          ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed);
           final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed)
-              .higherEntry(txnList.getHighWatermark());
+              .higherEntry(tableTxnList.getHighWatermark());
           if (tn != null) {
             if (currentInvalidatingTxnTime == 0L ||
                 tn.getValue() < currentInvalidatingTxnTime) {
@@ -424,16 +452,16 @@ public final class MaterializationsInvalidationCache {
               currentInvalidatingTxnTime = tn.getValue();
             }
           }
-          if (txnList.getMinOpenTxn() != null) {
+          if (tableTxnList.getMinOpenWriteId() != null) {
             // Invalid transaction list is sorted
             int pos = 0;
             for (Entry<Long, Long> t : tableModifications.get(qNameTableUsed)
-                .subMap(txnList.getMinOpenTxn(), txnList.getHighWatermark()).entrySet()) {
-              while (pos < txnList.getInvalidTransactions().length &&
-                  txnList.getInvalidTransactions()[pos] != t.getKey()) {
+                .subMap(tableTxnList.getMinOpenWriteId(), tableTxnList.getHighWatermark()).entrySet()) {
+              while (pos < tableTxnList.getInvalidWriteIds().length &&
+                  tableTxnList.getInvalidWriteIds()[pos] != t.getKey()) {
                 pos++;
               }
-              if (pos >= txnList.getInvalidTransactions().length) {
+              if (pos >= tableTxnList.getInvalidWriteIds().length) {
                 break;
               }
               if (currentInvalidatingTxnTime == 0L ||
@@ -462,6 +490,7 @@ public final class MaterializationsInvalidationCache {
     long removed = 0L;
     for (Entry<String, ConcurrentSkipListMap<Long, Long>> e : tableModifications.entrySet()) {
       Collection<Long> c = keepTxnInfos.get(e.getKey());
+      ConcurrentSkipListSet<Long> updateDeleteForTable = updateDeleteTableModifications.get(e.getKey());
       for (Iterator<Entry<Long, Long>> it = e.getValue().entrySet().iterator(); it.hasNext();) {
         Entry<Long, Long> v = it.next();
         // We need to check again the time because some of the transactions might not be explored
@@ -472,6 +501,9 @@ public final class MaterializationsInvalidationCache {
             LOG.debug("Transaction removed from cache for table {} -> id: {}, time: {}",
                 e.getKey(), v.getKey(), v.getValue());
           }
+          if (updateDeleteForTable != null) {
+            updateDeleteForTable.remove(v.getKey());
+          }
           it.remove();
           removed++;
         }
@@ -480,4 +512,23 @@ public final class MaterializationsInvalidationCache {
     return removed;
   }
 
+  /**
+   * Checks whether the given materialization exists in the invalidation cache.
+   * @param dbName the database name for the materialization
+   * @param tblName the table name for the materialization
+   * @return true if we have information about the materialization in the cache,
+   * false otherwise
+   */
+  public boolean containsMaterialization(String dbName, String tblName) {
+    if (disable || dbName == null || tblName == null) {
+      return false;
+    }
+    ConcurrentMap<String, Materialization> dbMaterializations = materializations.get(dbName);
+    if (dbMaterializations == null || dbMaterializations.get(tblName) == null) {
+      // This is a table
+      return false;
+    }
+    return true;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
new file mode 100644
index 0000000..8ca9ede
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Cleaner for the {@link MaterializationsRebuildLockHandler}. It removes outdated locks
+ * in the intervals specified by the input property.
+ */
+public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThread {
+  private static final Logger LOG = LoggerFactory.getLogger(MaterializationsRebuildLockCleanerTask.class);
+
+  private Configuration conf;
+
+  @Override
+  public long runFrequency(TimeUnit unit) {
+    return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, unit) / 2;
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    conf = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void run() {
+    long removedCnt = MaterializationsRebuildLockHandler.get().cleanupResourceLocks(
+        MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS));
+    if (removedCnt > 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.info("Number of materialization locks deleted: " + removedCnt);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java
new file mode 100644
index 0000000..dd31226
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is a lock handler implementation for the materializations rebuild.
+ * It is lightweight: it does not persist any information to metastore db.
+ * Its states are as follows:
+ * 1) request lock -> 2) ACQUIRED -> 4) COMMIT_READY -> 6) release lock
+ *                                -> 5) EXPIRED      ->
+ *                 -> 3) NOT_ACQUIRED
+ * First, the rebuild operation will ACQUIRE the lock. If other rebuild
+ * operation for the same operation is already running, we lock status
+ * will be NOT_ACQUIRED.
+ * Before committing the rebuild, the txn handler will signal the handler
+ * that it is ready to commit the resource (move state to COMMIT_READY).
+ * We make sure the lock is still available before moving to the new state.
+ * A lock will not be able to expire when it is in COMMIT_READY state.
+ * The unlock method is always call by the txn handler, no matter whether
+ * the transaction succeeds or not, e.g., due to an Exception.
+ * From ACQUIRED, locks can be also moved to EXPIRED state when they
+ * expire. From EXPIRED, they can only be released.
+ */
+public class MaterializationsRebuildLockHandler {
+
+  /* Singleton */
+  private static final MaterializationsRebuildLockHandler SINGLETON = new MaterializationsRebuildLockHandler();
+
+  private final ConcurrentMap<String, ResourceLock> locks = new ConcurrentHashMap<>();
+
+  private MaterializationsRebuildLockHandler() {
+  }
+
+  /**
+   * Get instance of MaterializationsRebuildLockHandler.
+   *
+   * @return the singleton
+   */
+  public static MaterializationsRebuildLockHandler get() {
+    return SINGLETON;
+  }
+
+  /**
+   * Lock materialized view (first step for rebuild). Response contains a lock id
+   * that corresponds to the input transaction id, and whether the lock was
+   * ACQUIRED or NOT_ACQUIRED.
+   * @param dbName the db name of the materialization
+   * @param tableName the table name of the materialization
+   * @param txnId the transaction id for the rebuild
+   * @return the response to the lock request
+   */
+  public LockResponse lockResource(String dbName, String tableName, long txnId) {
+    final ResourceLock prevResourceLock = locks.putIfAbsent(
+        Warehouse.getQualifiedName(dbName, tableName),
+        new ResourceLock(txnId, System.nanoTime(), State.ACQUIRED));
+    if (prevResourceLock != null) {
+      return new LockResponse(txnId, LockState.NOT_ACQUIRED);
+    }
+    return new LockResponse(txnId, LockState.ACQUIRED);
+  }
+
+  /**
+   * Moves from ACQUIRED state to COMMIT_READY.
+   * @param dbName the db name of the materialization
+   * @param tableName the table name of the materialization
+   * @param txnId the transaction id for the rebuild
+   * @return true if the lock was still active and we could move the materialization
+   * to COMMIT_READY state, false otherwise
+   */
+  public boolean readyToCommitResource(String dbName, String tableName, long txnId) {
+    final ResourceLock prevResourceLock = locks.get(Warehouse.getQualifiedName(dbName, tableName));
+    if (prevResourceLock == null || prevResourceLock.txnId != txnId) {
+      // Lock was outdated and it was removed (then maybe another transaction picked it up)
+      return false;
+    }
+    return prevResourceLock.state.compareAndSet(State.ACQUIRED, State.COMMIT_READY);
+  }
+
+  /**
+   * Heartbeats a certain lock and refreshes its timer.
+   * @param dbName the db name of the materialization
+   * @param tableName the table name of the materialization
+   * @param txnId the transaction id for the rebuild
+   * @throws MetaException
+   */
+  public boolean refreshLockResource(String dbName, String tableName, long txnId) {
+    final ResourceLock prevResourceLock = locks.get(Warehouse.getQualifiedName(dbName, tableName));
+    if (prevResourceLock == null || prevResourceLock.txnId != txnId ||
+        prevResourceLock.state.get() != State.ACQUIRED) {
+      // Lock was outdated and it was removed (then maybe another transaction picked it up)
+      // or changed its state
+      return false;
+    }
+    prevResourceLock.lastHeartBeatTime.set(System.currentTimeMillis());
+    return true;
+  }
+
+  /**
+   * Releases a certain lock.
+   * @param dbName the db name of the materialization
+   * @param tableName the table name of the materialization
+   * @param txnId the transaction id for the rebuild
+   * @return true if the lock could be released properly, false otherwise
+   * @throws MetaException
+   */
+  public boolean unlockResource(String dbName, String tableName, long txnId) {
+    final String fullyQualifiedName = Warehouse.getQualifiedName(dbName, tableName);
+    final ResourceLock prevResourceLock = locks.get(fullyQualifiedName);
+    if (prevResourceLock == null || prevResourceLock.txnId != txnId) {
+      return false;
+    }
+    return locks.remove(fullyQualifiedName, prevResourceLock);
+  }
+
+  /**
+   * Method that removes from the handler those locks that have expired.
+   * @param timeout time after which we consider the locks to have expired
+   * @throws MetaException
+   */
+  public long cleanupResourceLocks(long timeout) {
+    long removed = 0L;
+    final long currentTime = System.currentTimeMillis();
+    for (Iterator<Map.Entry<String, ResourceLock>> it = locks.entrySet().iterator(); it.hasNext();) {
+      final ResourceLock resourceLock = it.next().getValue();
+      if (currentTime - resourceLock.lastHeartBeatTime.get() > timeout) {
+        if (resourceLock.state.compareAndSet(State.ACQUIRED, State.EXPIRED)) {
+          it.remove();
+          removed++;
+        }
+      }
+    }
+    return removed;
+  }
+
+  /**
+   * This class represents a lock that consists of transaction id,
+   * last refresh time, and state.
+   */
+  private class ResourceLock {
+    final long txnId;
+    final AtomicLong lastHeartBeatTime;
+    final AtomicStateEnum state;
+
+    ResourceLock(long txnId, long lastHeartBeatTime, State state) {
+      this.txnId = txnId;
+      this.lastHeartBeatTime = new AtomicLong(lastHeartBeatTime);
+      this.state = new AtomicStateEnum(state);
+    }
+  }
+
+  private enum State {
+    // This is the initial state for a lock
+    ACQUIRED,
+    // This means that the lock is being committed at this instant, hence
+    // the cleaner should not remove it even if it times out. If transaction
+    // fails, the finally clause will remove the lock
+    COMMIT_READY,
+    // This means that the lock is ready to be cleaned, hence it cannot
+    // be committed anymore
+    EXPIRED;
+  }
+
+  /**
+   * Wrapper class around State enum to make its operations atomic.
+   */
+  private class AtomicStateEnum {
+    private final AtomicReference<State> ref;
+
+    public AtomicStateEnum(final State initialValue) {
+      this.ref = new AtomicReference<State>(initialValue);
+    }
+
+    public void set(final State newValue) {
+      this.ref.set(newValue);
+    }
+
+    public State get() {
+      return this.ref.get();
+    }
+
+    public State getAndSet(final State newValue) {
+      return this.ref.getAndSet(newValue);
+    }
+
+    public boolean compareAndSet(final State expect, final State update) {
+      return this.ref.compareAndSet(expect, update);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 940a1bf..f007261 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader;
 import org.apache.hadoop.hive.metastore.HiveAlterHandler;
 import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask;
+import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockCleanerTask;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.events.EventCleanerTask;
 import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
@@ -728,7 +729,8 @@ public class MetastoreConf {
     TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always",
         EventCleanerTask.class.getName() + "," +
         "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," +
-        MaterializationsCacheCleanerTask.class.getName(),
+        MaterializationsCacheCleanerTask.class.getName() + "," +
+        MaterializationsRebuildLockCleanerTask.class.getName(),
         "Comma separated list of tasks that will be started in separate threads.  These will " +
             "always be started, regardless of whether the metastore is running in embedded mode " +
             "or in server mode.  They must implement " + MetastoreTaskThread.class.getName()),