You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/04/10 09:29:40 UTC
[02/24] hive git commit: HIVE-18839: Implement incremental rebuild
for materialized views (only insert operations in source tables) (Jesus
Camacho Rodriguez, reviewed by Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php b/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
index 5e3dff1..7a8a42a 100644
--- a/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
+++ b/standalone-metastore/src/gen/thrift/gen-php/metastore/ThriftHiveMetastore.php
@@ -1514,6 +1514,20 @@ interface ThriftHiveMetastoreIf extends \FacebookServiceIf {
* @throws \metastore\MetaException
*/
public function get_serde(\metastore\GetSerdeRequest $rqst);
+ /**
+ * @param string $dbName
+ * @param string $tableName
+ * @param int $txnId
+ * @return \metastore\LockResponse
+ */
+ public function get_lock_materialization_rebuild($dbName, $tableName, $txnId);
+ /**
+ * @param string $dbName
+ * @param string $tableName
+ * @param int $txnId
+ * @return bool
+ */
+ public function heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId);
}
class ThriftHiveMetastoreClient extends \FacebookServiceClient implements \metastore\ThriftHiveMetastoreIf {
@@ -12887,6 +12901,112 @@ class ThriftHiveMetastoreClient extends \FacebookServiceClient implements \metas
throw new \Exception("get_serde failed: unknown result");
}
+ public function get_lock_materialization_rebuild($dbName, $tableName, $txnId)
+ {
+ $this->send_get_lock_materialization_rebuild($dbName, $tableName, $txnId);
+ return $this->recv_get_lock_materialization_rebuild();
+ }
+
+ public function send_get_lock_materialization_rebuild($dbName, $tableName, $txnId)
+ {
+ $args = new \metastore\ThriftHiveMetastore_get_lock_materialization_rebuild_args();
+ $args->dbName = $dbName;
+ $args->tableName = $tableName;
+ $args->txnId = $txnId;
+ $bin_accel = ($this->output_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary');
+ if ($bin_accel)
+ {
+ thrift_protocol_write_binary($this->output_, 'get_lock_materialization_rebuild', TMessageType::CALL, $args, $this->seqid_, $this->output_->isStrictWrite());
+ }
+ else
+ {
+ $this->output_->writeMessageBegin('get_lock_materialization_rebuild', TMessageType::CALL, $this->seqid_);
+ $args->write($this->output_);
+ $this->output_->writeMessageEnd();
+ $this->output_->getTransport()->flush();
+ }
+ }
+
+ public function recv_get_lock_materialization_rebuild()
+ {
+ $bin_accel = ($this->input_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_read_binary');
+ if ($bin_accel) $result = thrift_protocol_read_binary($this->input_, '\metastore\ThriftHiveMetastore_get_lock_materialization_rebuild_result', $this->input_->isStrictRead());
+ else
+ {
+ $rseqid = 0;
+ $fname = null;
+ $mtype = 0;
+
+ $this->input_->readMessageBegin($fname, $mtype, $rseqid);
+ if ($mtype == TMessageType::EXCEPTION) {
+ $x = new TApplicationException();
+ $x->read($this->input_);
+ $this->input_->readMessageEnd();
+ throw $x;
+ }
+ $result = new \metastore\ThriftHiveMetastore_get_lock_materialization_rebuild_result();
+ $result->read($this->input_);
+ $this->input_->readMessageEnd();
+ }
+ if ($result->success !== null) {
+ return $result->success;
+ }
+ throw new \Exception("get_lock_materialization_rebuild failed: unknown result");
+ }
+
+ public function heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId)
+ {
+ $this->send_heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId);
+ return $this->recv_heartbeat_lock_materialization_rebuild();
+ }
+
+ public function send_heartbeat_lock_materialization_rebuild($dbName, $tableName, $txnId)
+ {
+ $args = new \metastore\ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args();
+ $args->dbName = $dbName;
+ $args->tableName = $tableName;
+ $args->txnId = $txnId;
+ $bin_accel = ($this->output_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_write_binary');
+ if ($bin_accel)
+ {
+ thrift_protocol_write_binary($this->output_, 'heartbeat_lock_materialization_rebuild', TMessageType::CALL, $args, $this->seqid_, $this->output_->isStrictWrite());
+ }
+ else
+ {
+ $this->output_->writeMessageBegin('heartbeat_lock_materialization_rebuild', TMessageType::CALL, $this->seqid_);
+ $args->write($this->output_);
+ $this->output_->writeMessageEnd();
+ $this->output_->getTransport()->flush();
+ }
+ }
+
+ public function recv_heartbeat_lock_materialization_rebuild()
+ {
+ $bin_accel = ($this->input_ instanceof TBinaryProtocolAccelerated) && function_exists('thrift_protocol_read_binary');
+ if ($bin_accel) $result = thrift_protocol_read_binary($this->input_, '\metastore\ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result', $this->input_->isStrictRead());
+ else
+ {
+ $rseqid = 0;
+ $fname = null;
+ $mtype = 0;
+
+ $this->input_->readMessageBegin($fname, $mtype, $rseqid);
+ if ($mtype == TMessageType::EXCEPTION) {
+ $x = new TApplicationException();
+ $x->read($this->input_);
+ $this->input_->readMessageEnd();
+ throw $x;
+ }
+ $result = new \metastore\ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result();
+ $result->read($this->input_);
+ $this->input_->readMessageEnd();
+ }
+ if ($result->success !== null) {
+ return $result->success;
+ }
+ throw new \Exception("heartbeat_lock_materialization_rebuild failed: unknown result");
+ }
+
}
// HELPER FUNCTIONS AND STRUCTURES
@@ -57981,4 +58101,401 @@ class ThriftHiveMetastore_get_serde_result {
}
+class ThriftHiveMetastore_get_lock_materialization_rebuild_args {
+ static $_TSPEC;
+
+ /**
+ * @var string
+ */
+ public $dbName = null;
+ /**
+ * @var string
+ */
+ public $tableName = null;
+ /**
+ * @var int
+ */
+ public $txnId = null;
+
+ public function __construct($vals=null) {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ 1 => array(
+ 'var' => 'dbName',
+ 'type' => TType::STRING,
+ ),
+ 2 => array(
+ 'var' => 'tableName',
+ 'type' => TType::STRING,
+ ),
+ 3 => array(
+ 'var' => 'txnId',
+ 'type' => TType::I64,
+ ),
+ );
+ }
+ if (is_array($vals)) {
+ if (isset($vals['dbName'])) {
+ $this->dbName = $vals['dbName'];
+ }
+ if (isset($vals['tableName'])) {
+ $this->tableName = $vals['tableName'];
+ }
+ if (isset($vals['txnId'])) {
+ $this->txnId = $vals['txnId'];
+ }
+ }
+ }
+
+ public function getName() {
+ return 'ThriftHiveMetastore_get_lock_materialization_rebuild_args';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ case 1:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->dbName);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 2:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->tableName);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 3:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->txnId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('ThriftHiveMetastore_get_lock_materialization_rebuild_args');
+ if ($this->dbName !== null) {
+ $xfer += $output->writeFieldBegin('dbName', TType::STRING, 1);
+ $xfer += $output->writeString($this->dbName);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->tableName !== null) {
+ $xfer += $output->writeFieldBegin('tableName', TType::STRING, 2);
+ $xfer += $output->writeString($this->tableName);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->txnId !== null) {
+ $xfer += $output->writeFieldBegin('txnId', TType::I64, 3);
+ $xfer += $output->writeI64($this->txnId);
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
+class ThriftHiveMetastore_get_lock_materialization_rebuild_result {
+ static $_TSPEC;
+
+ /**
+ * @var \metastore\LockResponse
+ */
+ public $success = null;
+
+ public function __construct($vals=null) {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ 0 => array(
+ 'var' => 'success',
+ 'type' => TType::STRUCT,
+ 'class' => '\metastore\LockResponse',
+ ),
+ );
+ }
+ if (is_array($vals)) {
+ if (isset($vals['success'])) {
+ $this->success = $vals['success'];
+ }
+ }
+ }
+
+ public function getName() {
+ return 'ThriftHiveMetastore_get_lock_materialization_rebuild_result';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ case 0:
+ if ($ftype == TType::STRUCT) {
+ $this->success = new \metastore\LockResponse();
+ $xfer += $this->success->read($input);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('ThriftHiveMetastore_get_lock_materialization_rebuild_result');
+ if ($this->success !== null) {
+ if (!is_object($this->success)) {
+ throw new TProtocolException('Bad type in structure.', TProtocolException::INVALID_DATA);
+ }
+ $xfer += $output->writeFieldBegin('success', TType::STRUCT, 0);
+ $xfer += $this->success->write($output);
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
+class ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args {
+ static $_TSPEC;
+
+ /**
+ * @var string
+ */
+ public $dbName = null;
+ /**
+ * @var string
+ */
+ public $tableName = null;
+ /**
+ * @var int
+ */
+ public $txnId = null;
+
+ public function __construct($vals=null) {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ 1 => array(
+ 'var' => 'dbName',
+ 'type' => TType::STRING,
+ ),
+ 2 => array(
+ 'var' => 'tableName',
+ 'type' => TType::STRING,
+ ),
+ 3 => array(
+ 'var' => 'txnId',
+ 'type' => TType::I64,
+ ),
+ );
+ }
+ if (is_array($vals)) {
+ if (isset($vals['dbName'])) {
+ $this->dbName = $vals['dbName'];
+ }
+ if (isset($vals['tableName'])) {
+ $this->tableName = $vals['tableName'];
+ }
+ if (isset($vals['txnId'])) {
+ $this->txnId = $vals['txnId'];
+ }
+ }
+ }
+
+ public function getName() {
+ return 'ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ case 1:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->dbName);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 2:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->tableName);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 3:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->txnId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_args');
+ if ($this->dbName !== null) {
+ $xfer += $output->writeFieldBegin('dbName', TType::STRING, 1);
+ $xfer += $output->writeString($this->dbName);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->tableName !== null) {
+ $xfer += $output->writeFieldBegin('tableName', TType::STRING, 2);
+ $xfer += $output->writeString($this->tableName);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->txnId !== null) {
+ $xfer += $output->writeFieldBegin('txnId', TType::I64, 3);
+ $xfer += $output->writeI64($this->txnId);
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
+class ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result {
+ static $_TSPEC;
+
+ /**
+ * @var bool
+ */
+ public $success = null;
+
+ public function __construct($vals=null) {
+ if (!isset(self::$_TSPEC)) {
+ self::$_TSPEC = array(
+ 0 => array(
+ 'var' => 'success',
+ 'type' => TType::BOOL,
+ ),
+ );
+ }
+ if (is_array($vals)) {
+ if (isset($vals['success'])) {
+ $this->success = $vals['success'];
+ }
+ }
+ }
+
+ public function getName() {
+ return 'ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result';
+ }
+
+ public function read($input)
+ {
+ $xfer = 0;
+ $fname = null;
+ $ftype = 0;
+ $fid = 0;
+ $xfer += $input->readStructBegin($fname);
+ while (true)
+ {
+ $xfer += $input->readFieldBegin($fname, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ switch ($fid)
+ {
+ case 0:
+ if ($ftype == TType::BOOL) {
+ $xfer += $input->readBool($this->success);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ default:
+ $xfer += $input->skip($ftype);
+ break;
+ }
+ $xfer += $input->readFieldEnd();
+ }
+ $xfer += $input->readStructEnd();
+ return $xfer;
+ }
+
+ public function write($output) {
+ $xfer = 0;
+ $xfer += $output->writeStructBegin('ThriftHiveMetastore_heartbeat_lock_materialization_rebuild_result');
+ if ($this->success !== null) {
+ $xfer += $output->writeFieldBegin('success', TType::BOOL, 0);
+ $xfer += $output->writeBool($this->success);
+ $xfer += $output->writeFieldEnd();
+ }
+ $xfer += $output->writeFieldStop();
+ $xfer += $output->writeStructEnd();
+ return $xfer;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php
index d4fcc88..14416b4 100644
--- a/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php
+++ b/standalone-metastore/src/gen/thrift/gen-php/metastore/Types.php
@@ -23975,6 +23975,10 @@ class Materialization {
* @var int
*/
public $invalidationTime = null;
+ /**
+ * @var bool
+ */
+ public $sourceTablesUpdateDeleteModified = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -23995,6 +23999,10 @@ class Materialization {
'var' => 'invalidationTime',
'type' => TType::I64,
),
+ 4 => array(
+ 'var' => 'sourceTablesUpdateDeleteModified',
+ 'type' => TType::BOOL,
+ ),
);
}
if (is_array($vals)) {
@@ -24007,6 +24015,9 @@ class Materialization {
if (isset($vals['invalidationTime'])) {
$this->invalidationTime = $vals['invalidationTime'];
}
+ if (isset($vals['sourceTablesUpdateDeleteModified'])) {
+ $this->sourceTablesUpdateDeleteModified = $vals['sourceTablesUpdateDeleteModified'];
+ }
}
}
@@ -24064,6 +24075,13 @@ class Materialization {
$xfer += $input->skip($ftype);
}
break;
+ case 4:
+ if ($ftype == TType::BOOL) {
+ $xfer += $input->readBool($this->sourceTablesUpdateDeleteModified);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -24108,6 +24126,11 @@ class Materialization {
$xfer += $output->writeI64($this->invalidationTime);
$xfer += $output->writeFieldEnd();
}
+ if ($this->sourceTablesUpdateDeleteModified !== null) {
+ $xfer += $output->writeFieldBegin('sourceTablesUpdateDeleteModified', TType::BOOL, 4);
+ $xfer += $output->writeBool($this->sourceTablesUpdateDeleteModified);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
index d39690f..079c7fc 100755
--- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
+++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore-remote
@@ -224,6 +224,8 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
print(' void set_schema_version_state(SetSchemaVersionStateRequest rqst)')
print(' void add_serde(SerDeInfo serde)')
print(' SerDeInfo get_serde(GetSerdeRequest rqst)')
+ print(' LockResponse get_lock_materialization_rebuild(string dbName, string tableName, i64 txnId)')
+ print(' bool heartbeat_lock_materialization_rebuild(string dbName, string tableName, i64 txnId)')
print(' string getName()')
print(' string getVersion()')
print(' fb_status getStatus()')
@@ -1493,6 +1495,18 @@ elif cmd == 'get_serde':
sys.exit(1)
pp.pprint(client.get_serde(eval(args[0]),))
+elif cmd == 'get_lock_materialization_rebuild':
+ if len(args) != 3:
+ print('get_lock_materialization_rebuild requires 3 args')
+ sys.exit(1)
+ pp.pprint(client.get_lock_materialization_rebuild(args[0],args[1],eval(args[2]),))
+
+elif cmd == 'heartbeat_lock_materialization_rebuild':
+ if len(args) != 3:
+ print('heartbeat_lock_materialization_rebuild requires 3 args')
+ sys.exit(1)
+ pp.pprint(client.heartbeat_lock_materialization_rebuild(args[0],args[1],eval(args[2]),))
+
elif cmd == 'getName':
if len(args) != 0:
print('getName requires 0 args')
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
index f8ffeac..b0e64d8 100644
--- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
+++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ThriftHiveMetastore.py
@@ -1548,6 +1548,24 @@ class Iface(fb303.FacebookService.Iface):
"""
pass
+ def get_lock_materialization_rebuild(self, dbName, tableName, txnId):
+ """
+ Parameters:
+ - dbName
+ - tableName
+ - txnId
+ """
+ pass
+
+ def heartbeat_lock_materialization_rebuild(self, dbName, tableName, txnId):
+ """
+ Parameters:
+ - dbName
+ - tableName
+ - txnId
+ """
+ pass
+
class Client(fb303.FacebookService.Client, Iface):
"""
@@ -8711,6 +8729,76 @@ class Client(fb303.FacebookService.Client, Iface):
raise result.o2
raise TApplicationException(TApplicationException.MISSING_RESULT, "get_serde failed: unknown result")
+ def get_lock_materialization_rebuild(self, dbName, tableName, txnId):
+ """
+ Parameters:
+ - dbName
+ - tableName
+ - txnId
+ """
+ self.send_get_lock_materialization_rebuild(dbName, tableName, txnId)
+ return self.recv_get_lock_materialization_rebuild()
+
+ def send_get_lock_materialization_rebuild(self, dbName, tableName, txnId):
+ self._oprot.writeMessageBegin('get_lock_materialization_rebuild', TMessageType.CALL, self._seqid)
+ args = get_lock_materialization_rebuild_args()
+ args.dbName = dbName
+ args.tableName = tableName
+ args.txnId = txnId
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_get_lock_materialization_rebuild(self):
+ iprot = self._iprot
+ (fname, mtype, rseqid) = iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(iprot)
+ iprot.readMessageEnd()
+ raise x
+ result = get_lock_materialization_rebuild_result()
+ result.read(iprot)
+ iprot.readMessageEnd()
+ if result.success is not None:
+ return result.success
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "get_lock_materialization_rebuild failed: unknown result")
+
+ def heartbeat_lock_materialization_rebuild(self, dbName, tableName, txnId):
+ """
+ Parameters:
+ - dbName
+ - tableName
+ - txnId
+ """
+ self.send_heartbeat_lock_materialization_rebuild(dbName, tableName, txnId)
+ return self.recv_heartbeat_lock_materialization_rebuild()
+
+ def send_heartbeat_lock_materialization_rebuild(self, dbName, tableName, txnId):
+ self._oprot.writeMessageBegin('heartbeat_lock_materialization_rebuild', TMessageType.CALL, self._seqid)
+ args = heartbeat_lock_materialization_rebuild_args()
+ args.dbName = dbName
+ args.tableName = tableName
+ args.txnId = txnId
+ args.write(self._oprot)
+ self._oprot.writeMessageEnd()
+ self._oprot.trans.flush()
+
+ def recv_heartbeat_lock_materialization_rebuild(self):
+ iprot = self._iprot
+ (fname, mtype, rseqid) = iprot.readMessageBegin()
+ if mtype == TMessageType.EXCEPTION:
+ x = TApplicationException()
+ x.read(iprot)
+ iprot.readMessageEnd()
+ raise x
+ result = heartbeat_lock_materialization_rebuild_result()
+ result.read(iprot)
+ iprot.readMessageEnd()
+ if result.success is not None:
+ return result.success
+ raise TApplicationException(TApplicationException.MISSING_RESULT, "heartbeat_lock_materialization_rebuild failed: unknown result")
+
class Processor(fb303.FacebookService.Processor, Iface, TProcessor):
def __init__(self, handler):
@@ -8915,6 +9003,8 @@ class Processor(fb303.FacebookService.Processor, Iface, TProcessor):
self._processMap["set_schema_version_state"] = Processor.process_set_schema_version_state
self._processMap["add_serde"] = Processor.process_add_serde
self._processMap["get_serde"] = Processor.process_get_serde
+ self._processMap["get_lock_materialization_rebuild"] = Processor.process_get_lock_materialization_rebuild
+ self._processMap["heartbeat_lock_materialization_rebuild"] = Processor.process_heartbeat_lock_materialization_rebuild
def process(self, iprot, oprot):
(name, type, seqid) = iprot.readMessageBegin()
@@ -13889,6 +13979,44 @@ class Processor(fb303.FacebookService.Processor, Iface, TProcessor):
oprot.writeMessageEnd()
oprot.trans.flush()
+ def process_get_lock_materialization_rebuild(self, seqid, iprot, oprot):
+ args = get_lock_materialization_rebuild_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = get_lock_materialization_rebuild_result()
+ try:
+ result.success = self._handler.get_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId)
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("get_lock_materialization_rebuild", msg_type, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
+ def process_heartbeat_lock_materialization_rebuild(self, seqid, iprot, oprot):
+ args = heartbeat_lock_materialization_rebuild_args()
+ args.read(iprot)
+ iprot.readMessageEnd()
+ result = heartbeat_lock_materialization_rebuild_result()
+ try:
+ result.success = self._handler.heartbeat_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId)
+ msg_type = TMessageType.REPLY
+ except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+ raise
+ except Exception as ex:
+ msg_type = TMessageType.EXCEPTION
+ logging.exception(ex)
+ result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+ oprot.writeMessageBegin("heartbeat_lock_materialization_rebuild", msg_type, seqid)
+ result.write(oprot)
+ oprot.writeMessageEnd()
+ oprot.trans.flush()
+
# HELPER FUNCTIONS AND STRUCTURES
@@ -47281,3 +47409,314 @@ class get_serde_result:
def __ne__(self, other):
return not (self == other)
+
+class get_lock_materialization_rebuild_args:
+ """
+ Attributes:
+ - dbName
+ - tableName
+ - txnId
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'dbName', None, None, ), # 1
+ (2, TType.STRING, 'tableName', None, None, ), # 2
+ (3, TType.I64, 'txnId', None, None, ), # 3
+ )
+
+ def __init__(self, dbName=None, tableName=None, txnId=None,):
+ self.dbName = dbName
+ self.tableName = tableName
+ self.txnId = txnId
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.dbName = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.tableName = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I64:
+ self.txnId = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('get_lock_materialization_rebuild_args')
+ if self.dbName is not None:
+ oprot.writeFieldBegin('dbName', TType.STRING, 1)
+ oprot.writeString(self.dbName)
+ oprot.writeFieldEnd()
+ if self.tableName is not None:
+ oprot.writeFieldBegin('tableName', TType.STRING, 2)
+ oprot.writeString(self.tableName)
+ oprot.writeFieldEnd()
+ if self.txnId is not None:
+ oprot.writeFieldBegin('txnId', TType.I64, 3)
+ oprot.writeI64(self.txnId)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.dbName)
+ value = (value * 31) ^ hash(self.tableName)
+ value = (value * 31) ^ hash(self.txnId)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class get_lock_materialization_rebuild_result:
+ """
+ Attributes:
+ - success
+ """
+
+ thrift_spec = (
+ (0, TType.STRUCT, 'success', (LockResponse, LockResponse.thrift_spec), None, ), # 0
+ )
+
+ def __init__(self, success=None,):
+ self.success = success
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 0:
+ if ftype == TType.STRUCT:
+ self.success = LockResponse()
+ self.success.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('get_lock_materialization_rebuild_result')
+ if self.success is not None:
+ oprot.writeFieldBegin('success', TType.STRUCT, 0)
+ self.success.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.success)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class heartbeat_lock_materialization_rebuild_args:
+ """
+ Attributes:
+ - dbName
+ - tableName
+ - txnId
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'dbName', None, None, ), # 1
+ (2, TType.STRING, 'tableName', None, None, ), # 2
+ (3, TType.I64, 'txnId', None, None, ), # 3
+ )
+
+ def __init__(self, dbName=None, tableName=None, txnId=None,):
+ self.dbName = dbName
+ self.tableName = tableName
+ self.txnId = txnId
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.dbName = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.tableName = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I64:
+ self.txnId = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('heartbeat_lock_materialization_rebuild_args')
+ if self.dbName is not None:
+ oprot.writeFieldBegin('dbName', TType.STRING, 1)
+ oprot.writeString(self.dbName)
+ oprot.writeFieldEnd()
+ if self.tableName is not None:
+ oprot.writeFieldBegin('tableName', TType.STRING, 2)
+ oprot.writeString(self.tableName)
+ oprot.writeFieldEnd()
+ if self.txnId is not None:
+ oprot.writeFieldBegin('txnId', TType.I64, 3)
+ oprot.writeI64(self.txnId)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.dbName)
+ value = (value * 31) ^ hash(self.tableName)
+ value = (value * 31) ^ hash(self.txnId)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class heartbeat_lock_materialization_rebuild_result:
+ """
+ Attributes:
+ - success
+ """
+
+ thrift_spec = (
+ (0, TType.BOOL, 'success', None, None, ), # 0
+ )
+
+ def __init__(self, success=None,):
+ self.success = success
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 0:
+ if ftype == TType.BOOL:
+ self.success = iprot.readBool()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('heartbeat_lock_materialization_rebuild_result')
+ if self.success is not None:
+ oprot.writeFieldBegin('success', TType.BOOL, 0)
+ oprot.writeBool(self.success)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.success)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 972db1f..f2f61e0 100644
--- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -16941,6 +16941,7 @@ class Materialization:
- tablesUsed
- validTxnList
- invalidationTime
+ - sourceTablesUpdateDeleteModified
"""
thrift_spec = (
@@ -16948,12 +16949,14 @@ class Materialization:
(1, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 1
(2, TType.STRING, 'validTxnList', None, None, ), # 2
(3, TType.I64, 'invalidationTime', None, None, ), # 3
+ (4, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 4
)
- def __init__(self, tablesUsed=None, validTxnList=None, invalidationTime=None,):
+ def __init__(self, tablesUsed=None, validTxnList=None, invalidationTime=None, sourceTablesUpdateDeleteModified=None,):
self.tablesUsed = tablesUsed
self.validTxnList = validTxnList
self.invalidationTime = invalidationTime
+ self.sourceTablesUpdateDeleteModified = sourceTablesUpdateDeleteModified
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -16984,6 +16987,11 @@ class Materialization:
self.invalidationTime = iprot.readI64()
else:
iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.BOOL:
+ self.sourceTablesUpdateDeleteModified = iprot.readBool()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -17009,14 +17017,16 @@ class Materialization:
oprot.writeFieldBegin('invalidationTime', TType.I64, 3)
oprot.writeI64(self.invalidationTime)
oprot.writeFieldEnd()
+ if self.sourceTablesUpdateDeleteModified is not None:
+ oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 4)
+ oprot.writeBool(self.sourceTablesUpdateDeleteModified)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
if self.tablesUsed is None:
raise TProtocol.TProtocolException(message='Required field tablesUsed is unset!')
- if self.invalidationTime is None:
- raise TProtocol.TProtocolException(message='Required field invalidationTime is unset!')
return
@@ -17025,6 +17035,7 @@ class Materialization:
value = (value * 31) ^ hash(self.tablesUsed)
value = (value * 31) ^ hash(self.validTxnList)
value = (value * 31) ^ hash(self.invalidationTime)
+ value = (value * 31) ^ hash(self.sourceTablesUpdateDeleteModified)
return value
def __repr__(self):
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 94454a1..0e70e89 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -3814,18 +3814,19 @@ class Materialization
TABLESUSED = 1
VALIDTXNLIST = 2
INVALIDATIONTIME = 3
+ SOURCETABLESUPDATEDELETEMODIFIED = 4
FIELDS = {
TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}},
VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true},
- INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 'invalidationTime'}
+ INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 'invalidationTime', :optional => true},
+ SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name => 'sourceTablesUpdateDeleteModified', :optional => true}
}
def struct_fields; FIELDS; end
def validate
raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tablesUsed is unset!') unless @tablesUsed
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field invalidationTime is unset!') unless @invalidationTime
end
::Thrift::Struct.generate_accessors self
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index c103675..58ebd29 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -3348,6 +3348,36 @@ module ThriftHiveMetastore
raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_serde failed: unknown result')
end
+ def get_lock_materialization_rebuild(dbName, tableName, txnId)
+ send_get_lock_materialization_rebuild(dbName, tableName, txnId)
+ return recv_get_lock_materialization_rebuild()
+ end
+
+ def send_get_lock_materialization_rebuild(dbName, tableName, txnId)
+ send_message('get_lock_materialization_rebuild', Get_lock_materialization_rebuild_args, :dbName => dbName, :tableName => tableName, :txnId => txnId)
+ end
+
+ def recv_get_lock_materialization_rebuild()
+ result = receive_message(Get_lock_materialization_rebuild_result)
+ return result.success unless result.success.nil?
+ raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_lock_materialization_rebuild failed: unknown result')
+ end
+
+ def heartbeat_lock_materialization_rebuild(dbName, tableName, txnId)
+ send_heartbeat_lock_materialization_rebuild(dbName, tableName, txnId)
+ return recv_heartbeat_lock_materialization_rebuild()
+ end
+
+ def send_heartbeat_lock_materialization_rebuild(dbName, tableName, txnId)
+ send_message('heartbeat_lock_materialization_rebuild', Heartbeat_lock_materialization_rebuild_args, :dbName => dbName, :tableName => tableName, :txnId => txnId)
+ end
+
+ def recv_heartbeat_lock_materialization_rebuild()
+ result = receive_message(Heartbeat_lock_materialization_rebuild_result)
+ return result.success unless result.success.nil?
+ raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'heartbeat_lock_materialization_rebuild failed: unknown result')
+ end
+
end
class Processor < ::FacebookService::Processor
@@ -5875,6 +5905,20 @@ module ThriftHiveMetastore
write_result(result, oprot, 'get_serde', seqid)
end
+ def process_get_lock_materialization_rebuild(seqid, iprot, oprot)
+ args = read_args(iprot, Get_lock_materialization_rebuild_args)
+ result = Get_lock_materialization_rebuild_result.new()
+ result.success = @handler.get_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId)
+ write_result(result, oprot, 'get_lock_materialization_rebuild', seqid)
+ end
+
+ def process_heartbeat_lock_materialization_rebuild(seqid, iprot, oprot)
+ args = read_args(iprot, Heartbeat_lock_materialization_rebuild_args)
+ result = Heartbeat_lock_materialization_rebuild_result.new()
+ result.success = @handler.heartbeat_lock_materialization_rebuild(args.dbName, args.tableName, args.txnId)
+ write_result(result, oprot, 'heartbeat_lock_materialization_rebuild', seqid)
+ end
+
end
# HELPER FUNCTIONS AND STRUCTURES
@@ -13301,5 +13345,77 @@ module ThriftHiveMetastore
::Thrift::Struct.generate_accessors self
end
+ class Get_lock_materialization_rebuild_args
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ DBNAME = 1
+ TABLENAME = 2
+ TXNID = 3
+
+ FIELDS = {
+ DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
+ TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
+ TXNID => {:type => ::Thrift::Types::I64, :name => 'txnId'}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Get_lock_materialization_rebuild_result
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ SUCCESS = 0
+
+ FIELDS = {
+ SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::LockResponse}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Heartbeat_lock_materialization_rebuild_args
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ DBNAME = 1
+ TABLENAME = 2
+ TXNID = 3
+
+ FIELDS = {
+ DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
+ TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
+ TXNID => {:type => ::Thrift::Types::I64, :name => 'txnId'}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Heartbeat_lock_materialization_rebuild_result
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ SUCCESS = 0
+
+ FIELDS = {
+ SUCCESS => {:type => ::Thrift::Types::BOOL, :name => 'success'}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
end
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index c81b8fa..30922ba 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -8367,6 +8367,18 @@ public class HiveMetaStore extends ThriftHiveMetastore {
endFunction("get_serde", serde != null, ex);
}
}
+
+ @Override
+ public LockResponse get_lock_materialization_rebuild(String dbName, String tableName, long txnId)
+ throws TException {
+ return MaterializationsRebuildLockHandler.get().lockResource(dbName, tableName, txnId);
+ }
+
+ @Override
+ public boolean heartbeat_lock_materialization_rebuild(String dbName, String tableName, long txnId)
+ throws TException {
+ return MaterializationsRebuildLockHandler.get().refreshLockResource(dbName, tableName, txnId);
+ }
}
private static IHMSHandler newRetryingHMSHandler(IHMSHandler baseHandler, Configuration conf)
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index ebbf465..95a3767 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -3127,4 +3127,14 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
else if (max <= Short.MAX_VALUE) return (short)max;
else return Short.MAX_VALUE;
}
+
+ @Override
+ public LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException {
+ return client.get_lock_materialization_rebuild(dbName, tableName, txnId);
+ }
+
+ @Override
+ public boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException {
+ return client.heartbeat_lock_materialization_rebuild(dbName, tableName, txnId);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index b2c40c2..98674cf 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -3613,4 +3613,25 @@ public interface IMetaStoreClient {
* @throws TException general thrift error
*/
SerDeInfo getSerDe(String serDeName) throws TException;
+
+ /**
+ * Acquire the materialization rebuild lock for a given view. We need to specify the fully
+ * qualified name of the materialized view and the open transaction ID so we can identify
+ * uniquely the lock.
+ * @param dbName db name for the materialized view
+ * @param tableName table name for the materialized view
+ * @param txnId transaction id for the rebuild
+ * @return the response from the metastore, where the lock id is equal to the txn id and
+ * the status can be either ACQUIRED or NOT ACQUIRED
+ */
+ LockResponse lockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException;
+
+ /**
+ * Method to refresh the acquisition of a given materialization rebuild lock.
+ * @param dbName db name for the materialized view
+ * @param tableName table name for the materialized view
+ * @param txnId transaction id for the rebuild
+ * @return true if the lock could be renewed, false otherwise
+ */
+ boolean heartbeatLockMaterializationRebuild(String dbName, String tableName, long txnId) throws TException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java
deleted file mode 100644
index 3d77407..0000000
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore;
-
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.hadoop.hive.metastore.api.Materialization;
-import org.apache.hadoop.hive.metastore.api.Table;
-
-/**
- * Contains information about the invalidation of a materialization,
- * including the materialization name, the tables that it uses, and
- * the invalidation time, i.e., the first moment t0 after the
- * materialization was created at which one of the tables that it uses
- * was modified.
- */
-@SuppressWarnings("serial")
-public class MaterializationInvalidationInfo extends Materialization {
-
- private AtomicLong invalidationTime;
-
- public MaterializationInvalidationInfo(Set<String> tablesUsed, String validTxnList) {
- super(tablesUsed, 0);
- this.setValidTxnList(validTxnList);
- this.invalidationTime = new AtomicLong(0);
- }
-
- public boolean compareAndSetInvalidationTime(long expect, long update) {
- boolean success = invalidationTime.compareAndSet(expect, update);
- if (success) {
- super.setInvalidationTime(update);
- }
- return success;
- }
-
- public long getInvalidationTime() {
- return invalidationTime.get();
- }
-
- public void setInvalidationTime(long invalidationTime) {
- throw new UnsupportedOperationException("You should call compareAndSetInvalidationTime instead");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
index 80cb1de..99c5abc 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
@@ -26,15 +26,17 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
-import org.apache.hadoop.hive.common.ValidTxnList;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.Materialization;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -65,10 +67,12 @@ public final class MaterializationsInvalidationCache {
/* Key is the database name. Each value is a map from the unique view qualified name to
* the materialization invalidation info. This invalidation object contains information
- * such as the tables used by the materialized view or the invalidation time, i.e., first
- * modification of the tables used by materialized view after the view was created. */
- private final ConcurrentMap<String, ConcurrentMap<String, MaterializationInvalidationInfo>> materializations =
- new ConcurrentHashMap<String, ConcurrentMap<String, MaterializationInvalidationInfo>>();
+ * such as the tables used by the materialized view, whether there was any update or
+ * delete in the source tables since the materialized view was created or rebuilt,
+ * or the invalidation time, i.e., first modification of the tables used by materialized
+ * view after the view was created. */
+ private final ConcurrentMap<String, ConcurrentMap<String, Materialization>> materializations =
+ new ConcurrentHashMap<>();
/*
* Key is a qualified table name. The value is a (sorted) tree map (supporting concurrent
@@ -77,7 +81,10 @@ public final class MaterializationsInvalidationCache {
* materialization.
*/
private final ConcurrentMap<String, ConcurrentSkipListMap<Long, Long>> tableModifications =
- new ConcurrentHashMap<String, ConcurrentSkipListMap<Long, Long>>();
+ new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<String, ConcurrentSkipListSet<Long>> updateDeleteTableModifications =
+ new ConcurrentHashMap<>();
/* Whether the cache has been initialized or not. */
private boolean initialized;
@@ -188,9 +195,9 @@ public final class MaterializationsInvalidationCache {
return;
}
// We are going to create the map for each view in the given database
- ConcurrentMap<String, MaterializationInvalidationInfo> cq =
- new ConcurrentHashMap<String, MaterializationInvalidationInfo>();
- final ConcurrentMap<String, MaterializationInvalidationInfo> prevCq = materializations.putIfAbsent(
+ ConcurrentMap<String, Materialization> cq =
+ new ConcurrentHashMap<String, Materialization>();
+ final ConcurrentMap<String, Materialization> prevCq = materializations.putIfAbsent(
dbName, cq);
if (prevCq != null) {
cq = prevCq;
@@ -204,13 +211,15 @@ public final class MaterializationsInvalidationCache {
}
if (opType == OpType.CREATE || opType == OpType.ALTER) {
// You store the materialized view
- cq.put(tableName, new MaterializationInvalidationInfo(tablesUsed, validTxnList));
+ Materialization materialization = new Materialization(tablesUsed);
+ materialization.setValidTxnList(validTxnList);
+ cq.put(tableName, materialization);
} else {
- ValidTxnList txnList = new ValidReadTxnList(validTxnList);
+ ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(validTxnList);
for (String qNameTableUsed : tablesUsed) {
+ ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed);
// First we insert a new tree set to keep table modifications, unless it already exists
- ConcurrentSkipListMap<Long, Long> modificationsTree =
- new ConcurrentSkipListMap<Long, Long>();
+ ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>();
final ConcurrentSkipListMap<Long, Long> prevModificationsTree = tableModifications.putIfAbsent(
qNameTableUsed, modificationsTree);
if (prevModificationsTree != null) {
@@ -222,7 +231,7 @@ public final class MaterializationsInvalidationCache {
try {
String[] names = qNameTableUsed.split("\\.");
BasicTxnInfo e = handler.getTxnHandler().getFirstCompletedTransactionForTableAfterCommit(
- names[0], names[1], txnList);
+ names[0], names[1], tableTxnList);
if (!e.isIsnull()) {
modificationsTree.put(e.getTxnid(), e.getTime());
// We do not need to do anything more for current table, as we detected
@@ -236,7 +245,9 @@ public final class MaterializationsInvalidationCache {
}
}
// For LOAD, you only add it if it does exist as you might be loading an outdated MV
- cq.putIfAbsent(tableName, new MaterializationInvalidationInfo(tablesUsed, validTxnList));
+ Materialization materialization = new Materialization(tablesUsed);
+ materialization.setValidTxnList(validTxnList);
+ cq.putIfAbsent(tableName, materialization);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Cached materialized view for rewriting in invalidation cache: " +
@@ -249,7 +260,7 @@ public final class MaterializationsInvalidationCache {
* invalidation for the MVs that use that table.
*/
public void notifyTableModification(String dbName, String tableName,
- long txnId, long newModificationTime) {
+ long txnId, long newModificationTime, boolean isUpdateDelete) {
if (disable) {
// Nothing to do
return;
@@ -258,8 +269,18 @@ public final class MaterializationsInvalidationCache {
LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}",
tableName, dbName, txnId, newModificationTime);
}
- ConcurrentSkipListMap<Long, Long> modificationsTree =
- new ConcurrentSkipListMap<Long, Long>();
+ if (isUpdateDelete) {
+ // We update first the update/delete modifications record
+ ConcurrentSkipListSet<Long> modificationsSet = new ConcurrentSkipListSet<>();
+ final ConcurrentSkipListSet<Long> prevModificationsSet =
+ updateDeleteTableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName),
+ modificationsSet);
+ if (prevModificationsSet != null) {
+ modificationsSet = prevModificationsSet;
+ }
+ modificationsSet.add(txnId);
+ }
+ ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>();
final ConcurrentSkipListMap<Long, Long> prevModificationsTree =
tableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), modificationsTree);
if (prevModificationsTree != null) {
@@ -293,30 +314,21 @@ public final class MaterializationsInvalidationCache {
if (materializations.get(dbName) != null) {
ImmutableMap.Builder<String, Materialization> m = ImmutableMap.builder();
for (String materializationName : materializationNames) {
- MaterializationInvalidationInfo materialization =
+ Materialization materialization =
materializations.get(dbName).get(materializationName);
if (materialization == null) {
LOG.debug("Materialization {} skipped as there is no information "
+ "in the invalidation cache about it", materializationName);
continue;
}
- long invalidationTime = getInvalidationTime(materialization);
- // We need to check whether previous value is zero, as data modification
- // in another table used by the materialized view might have modified
- // the value too
- boolean modified = materialization.compareAndSetInvalidationTime(0L, invalidationTime);
- while (!modified) {
- long currentInvalidationTime = materialization.getInvalidationTime();
- if (invalidationTime < currentInvalidationTime) {
- // It was set by other table modification, but it was after this table modification
- // hence we need to set it
- modified = materialization.compareAndSetInvalidationTime(currentInvalidationTime, invalidationTime);
- } else {
- // Nothing to do
- modified = true;
- }
- }
- m.put(materializationName, materialization);
+ // We create a deep copy of the materialization, as we need to set the time
+ // and whether any update/delete operation happen on the tables that it uses
+ // since it was created.
+ Materialization materializationCopy = new Materialization(
+ materialization.getTablesUsed());
+ materializationCopy.setValidTxnList(materialization.getValidTxnList());
+ enrichWithInvalidationInfo(materializationCopy);
+ m.put(materializationName, materializationCopy);
}
Map<String, Materialization> result = m.build();
if (LOG.isDebugEnabled()) {
@@ -327,50 +339,65 @@ public final class MaterializationsInvalidationCache {
return ImmutableMap.of();
}
- private long getInvalidationTime(MaterializationInvalidationInfo materialization) {
- String txnListString = materialization.getValidTxnList();
- if (txnListString == null) {
+ private void enrichWithInvalidationInfo(Materialization materialization) {
+ String materializationTxnListString = materialization.getValidTxnList();
+ if (materializationTxnListString == null) {
// This can happen when the materialization was created on non-transactional tables
- return Long.MIN_VALUE;
+ materialization.setInvalidationTime(Long.MIN_VALUE);
+ return;
}
// We will obtain the modification time as follows.
// First, we obtain the first element after high watermark (if any)
// Then, we iterate through the elements from min open txn till high
// watermark, updating the modification time after creation if needed
- ValidTxnList txnList = new ValidReadTxnList(txnListString);
+ ValidTxnWriteIdList materializationTxnList = new ValidTxnWriteIdList(materializationTxnListString);
long firstModificationTimeAfterCreation = 0L;
+ boolean containsUpdateDelete = false;
for (String qNameTableUsed : materialization.getTablesUsed()) {
- final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed)
- .higherEntry(txnList.getHighWatermark());
+ final ValidWriteIdList tableMaterializationTxnList =
+ materializationTxnList.getTableValidWriteIdList(qNameTableUsed);
+
+ final ConcurrentSkipListMap<Long, Long> usedTableModifications =
+ tableModifications.get(qNameTableUsed);
+ final ConcurrentSkipListSet<Long> usedUDTableModifications =
+ updateDeleteTableModifications.get(qNameTableUsed);
+ final Entry<Long, Long> tn = usedTableModifications.higherEntry(tableMaterializationTxnList.getHighWatermark());
if (tn != null) {
if (firstModificationTimeAfterCreation == 0L ||
tn.getValue() < firstModificationTimeAfterCreation) {
firstModificationTimeAfterCreation = tn.getValue();
}
+ // Check if there was any update/delete after creation
+ containsUpdateDelete = usedUDTableModifications != null &&
+ !usedUDTableModifications.tailSet(tableMaterializationTxnList.getHighWatermark(), false).isEmpty();
}
// Min open txn might be null if there were no open transactions
// when this transaction was being executed
- if (txnList.getMinOpenTxn() != null) {
+ if (tableMaterializationTxnList.getMinOpenWriteId() != null) {
// Invalid transaction list is sorted
int pos = 0;
- for (Map.Entry<Long, Long> t : tableModifications.get(qNameTableUsed)
- .subMap(txnList.getMinOpenTxn(), txnList.getHighWatermark()).entrySet()) {
- while (pos < txnList.getInvalidTransactions().length &&
- txnList.getInvalidTransactions()[pos] != t.getKey()) {
+ for (Map.Entry<Long, Long> t : usedTableModifications
+ .subMap(tableMaterializationTxnList.getMinOpenWriteId(), tableMaterializationTxnList.getHighWatermark()).entrySet()) {
+ while (pos < tableMaterializationTxnList.getInvalidWriteIds().length &&
+ tableMaterializationTxnList.getInvalidWriteIds()[pos] != t.getKey()) {
pos++;
}
- if (pos >= txnList.getInvalidTransactions().length) {
+ if (pos >= tableMaterializationTxnList.getInvalidWriteIds().length) {
break;
}
if (firstModificationTimeAfterCreation == 0L ||
t.getValue() < firstModificationTimeAfterCreation) {
firstModificationTimeAfterCreation = t.getValue();
}
+ containsUpdateDelete = containsUpdateDelete ||
+ (usedUDTableModifications != null && usedUDTableModifications.contains(t.getKey()));
}
}
}
- return firstModificationTimeAfterCreation;
+
+ materialization.setInvalidationTime(firstModificationTimeAfterCreation);
+ materialization.setSourceTablesUpdateDeleteModified(containsUpdateDelete);
}
private enum OpType {
@@ -395,16 +422,17 @@ public final class MaterializationsInvalidationCache {
// We execute the cleanup in two steps
// First we gather all the transactions that need to be kept
final Multimap<String, Long> keepTxnInfos = HashMultimap.create();
- for (Map.Entry<String, ConcurrentMap<String, MaterializationInvalidationInfo>> e : materializations.entrySet()) {
- for (MaterializationInvalidationInfo m : e.getValue().values()) {
- ValidTxnList txnList = new ValidReadTxnList(m.getValidTxnList());
+ for (Map.Entry<String, ConcurrentMap<String, Materialization>> e : materializations.entrySet()) {
+ for (Materialization m : e.getValue().values()) {
+ ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(m.getValidTxnList());
boolean canBeDeleted = false;
String currentTableForInvalidatingTxn = null;
long currentInvalidatingTxnId = 0L;
long currentInvalidatingTxnTime = 0L;
for (String qNameTableUsed : m.getTablesUsed()) {
+ ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed);
final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed)
- .higherEntry(txnList.getHighWatermark());
+ .higherEntry(tableTxnList.getHighWatermark());
if (tn != null) {
if (currentInvalidatingTxnTime == 0L ||
tn.getValue() < currentInvalidatingTxnTime) {
@@ -424,16 +452,16 @@ public final class MaterializationsInvalidationCache {
currentInvalidatingTxnTime = tn.getValue();
}
}
- if (txnList.getMinOpenTxn() != null) {
+ if (tableTxnList.getMinOpenWriteId() != null) {
// Invalid transaction list is sorted
int pos = 0;
for (Entry<Long, Long> t : tableModifications.get(qNameTableUsed)
- .subMap(txnList.getMinOpenTxn(), txnList.getHighWatermark()).entrySet()) {
- while (pos < txnList.getInvalidTransactions().length &&
- txnList.getInvalidTransactions()[pos] != t.getKey()) {
+ .subMap(tableTxnList.getMinOpenWriteId(), tableTxnList.getHighWatermark()).entrySet()) {
+ while (pos < tableTxnList.getInvalidWriteIds().length &&
+ tableTxnList.getInvalidWriteIds()[pos] != t.getKey()) {
pos++;
}
- if (pos >= txnList.getInvalidTransactions().length) {
+ if (pos >= tableTxnList.getInvalidWriteIds().length) {
break;
}
if (currentInvalidatingTxnTime == 0L ||
@@ -462,6 +490,7 @@ public final class MaterializationsInvalidationCache {
long removed = 0L;
for (Entry<String, ConcurrentSkipListMap<Long, Long>> e : tableModifications.entrySet()) {
Collection<Long> c = keepTxnInfos.get(e.getKey());
+ ConcurrentSkipListSet<Long> updateDeleteForTable = updateDeleteTableModifications.get(e.getKey());
for (Iterator<Entry<Long, Long>> it = e.getValue().entrySet().iterator(); it.hasNext();) {
Entry<Long, Long> v = it.next();
// We need to check again the time because some of the transactions might not be explored
@@ -472,6 +501,9 @@ public final class MaterializationsInvalidationCache {
LOG.debug("Transaction removed from cache for table {} -> id: {}, time: {}",
e.getKey(), v.getKey(), v.getValue());
}
+ if (updateDeleteForTable != null) {
+ updateDeleteForTable.remove(v.getKey());
+ }
it.remove();
removed++;
}
@@ -480,4 +512,23 @@ public final class MaterializationsInvalidationCache {
return removed;
}
+ /**
+ * Checks whether the given materialization exists in the invalidation cache.
+ * @param dbName the database name for the materialization
+ * @param tblName the table name for the materialization
+ * @return true if we have information about the materialization in the cache,
+ * false otherwise
+ */
+ public boolean containsMaterialization(String dbName, String tblName) {
+ if (disable || dbName == null || tblName == null) {
+ return false;
+ }
+ ConcurrentMap<String, Materialization> dbMaterializations = materializations.get(dbName);
+ if (dbMaterializations == null || dbMaterializations.get(tblName) == null) {
+ // This is a table
+ return false;
+ }
+ return true;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
new file mode 100644
index 0000000..8ca9ede
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Cleaner for the {@link MaterializationsRebuildLockHandler}. It removes outdated locks
+ * in the intervals specified by the input property.
+ */
+public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThread {
+ private static final Logger LOG = LoggerFactory.getLogger(MaterializationsRebuildLockCleanerTask.class);
+
+ private Configuration conf;
+
+ @Override
+ public long runFrequency(TimeUnit unit) {
+ return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, unit) / 2;
+ }
+
+ @Override
+ public void setConf(Configuration configuration) {
+ conf = configuration;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void run() {
+ long removedCnt = MaterializationsRebuildLockHandler.get().cleanupResourceLocks(
+ MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS));
+ if (removedCnt > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Number of materialization locks deleted: " + removedCnt);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java
new file mode 100644
index 0000000..dd31226
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockHandler.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is a lock handler implementation for the materializations rebuild.
+ * It is lightweight: it does not persist any information to metastore db.
+ * Its states are as follows:
+ * 1) request lock -> 2) ACQUIRED -> 4) COMMIT_READY -> 6) release lock
+ * -> 5) EXPIRED ->
+ * -> 3) NOT_ACQUIRED
+ * First, the rebuild operation will ACQUIRE the lock. If other rebuild
+ * operation for the same operation is already running, we lock status
+ * will be NOT_ACQUIRED.
+ * Before committing the rebuild, the txn handler will signal the handler
+ * that it is ready to commit the resource (move state to COMMIT_READY).
+ * We make sure the lock is still available before moving to the new state.
+ * A lock will not be able to expire when it is in COMMIT_READY state.
+ * The unlock method is always call by the txn handler, no matter whether
+ * the transaction succeeds or not, e.g., due to an Exception.
+ * From ACQUIRED, locks can be also moved to EXPIRED state when they
+ * expire. From EXPIRED, they can only be released.
+ */
+public class MaterializationsRebuildLockHandler {
+
+ /* Singleton */
+ private static final MaterializationsRebuildLockHandler SINGLETON = new MaterializationsRebuildLockHandler();
+
+ private final ConcurrentMap<String, ResourceLock> locks = new ConcurrentHashMap<>();
+
+ private MaterializationsRebuildLockHandler() {
+ }
+
+ /**
+ * Get instance of MaterializationsRebuildLockHandler.
+ *
+ * @return the singleton
+ */
+ public static MaterializationsRebuildLockHandler get() {
+ return SINGLETON;
+ }
+
+ /**
+ * Lock materialized view (first step for rebuild). Response contains a lock id
+ * that corresponds to the input transaction id, and whether the lock was
+ * ACQUIRED or NOT_ACQUIRED.
+ * @param dbName the db name of the materialization
+ * @param tableName the table name of the materialization
+ * @param txnId the transaction id for the rebuild
+ * @return the response to the lock request
+ */
+ public LockResponse lockResource(String dbName, String tableName, long txnId) {
+ final ResourceLock prevResourceLock = locks.putIfAbsent(
+ Warehouse.getQualifiedName(dbName, tableName),
+ new ResourceLock(txnId, System.nanoTime(), State.ACQUIRED));
+ if (prevResourceLock != null) {
+ return new LockResponse(txnId, LockState.NOT_ACQUIRED);
+ }
+ return new LockResponse(txnId, LockState.ACQUIRED);
+ }
+
+ /**
+ * Moves from ACQUIRED state to COMMIT_READY.
+ * @param dbName the db name of the materialization
+ * @param tableName the table name of the materialization
+ * @param txnId the transaction id for the rebuild
+ * @return true if the lock was still active and we could move the materialization
+ * to COMMIT_READY state, false otherwise
+ */
+ public boolean readyToCommitResource(String dbName, String tableName, long txnId) {
+ final ResourceLock prevResourceLock = locks.get(Warehouse.getQualifiedName(dbName, tableName));
+ if (prevResourceLock == null || prevResourceLock.txnId != txnId) {
+ // Lock was outdated and it was removed (then maybe another transaction picked it up)
+ return false;
+ }
+ return prevResourceLock.state.compareAndSet(State.ACQUIRED, State.COMMIT_READY);
+ }
+
+ /**
+ * Heartbeats a certain lock and refreshes its timer.
+ * @param dbName the db name of the materialization
+ * @param tableName the table name of the materialization
+ * @param txnId the transaction id for the rebuild
+ * @throws MetaException
+ */
+ public boolean refreshLockResource(String dbName, String tableName, long txnId) {
+ final ResourceLock prevResourceLock = locks.get(Warehouse.getQualifiedName(dbName, tableName));
+ if (prevResourceLock == null || prevResourceLock.txnId != txnId ||
+ prevResourceLock.state.get() != State.ACQUIRED) {
+ // Lock was outdated and it was removed (then maybe another transaction picked it up)
+ // or changed its state
+ return false;
+ }
+ prevResourceLock.lastHeartBeatTime.set(System.currentTimeMillis());
+ return true;
+ }
+
+ /**
+ * Releases a certain lock.
+ * @param dbName the db name of the materialization
+ * @param tableName the table name of the materialization
+ * @param txnId the transaction id for the rebuild
+ * @return true if the lock could be released properly, false otherwise
+ * @throws MetaException
+ */
+ public boolean unlockResource(String dbName, String tableName, long txnId) {
+ final String fullyQualifiedName = Warehouse.getQualifiedName(dbName, tableName);
+ final ResourceLock prevResourceLock = locks.get(fullyQualifiedName);
+ if (prevResourceLock == null || prevResourceLock.txnId != txnId) {
+ return false;
+ }
+ return locks.remove(fullyQualifiedName, prevResourceLock);
+ }
+
+ /**
+ * Method that removes from the handler those locks that have expired.
+ * @param timeout time after which we consider the locks to have expired
+ * @throws MetaException
+ */
+ public long cleanupResourceLocks(long timeout) {
+ long removed = 0L;
+ final long currentTime = System.currentTimeMillis();
+ for (Iterator<Map.Entry<String, ResourceLock>> it = locks.entrySet().iterator(); it.hasNext();) {
+ final ResourceLock resourceLock = it.next().getValue();
+ if (currentTime - resourceLock.lastHeartBeatTime.get() > timeout) {
+ if (resourceLock.state.compareAndSet(State.ACQUIRED, State.EXPIRED)) {
+ it.remove();
+ removed++;
+ }
+ }
+ }
+ return removed;
+ }
+
+ /**
+ * This class represents a lock that consists of transaction id,
+ * last refresh time, and state.
+ */
+ private class ResourceLock {
+ final long txnId;
+ final AtomicLong lastHeartBeatTime;
+ final AtomicStateEnum state;
+
+ ResourceLock(long txnId, long lastHeartBeatTime, State state) {
+ this.txnId = txnId;
+ this.lastHeartBeatTime = new AtomicLong(lastHeartBeatTime);
+ this.state = new AtomicStateEnum(state);
+ }
+ }
+
+ private enum State {
+ // This is the initial state for a lock
+ ACQUIRED,
+ // This means that the lock is being committed at this instant, hence
+ // the cleaner should not remove it even if it times out. If transaction
+ // fails, the finally clause will remove the lock
+ COMMIT_READY,
+ // This means that the lock is ready to be cleaned, hence it cannot
+ // be committed anymore
+ EXPIRED;
+ }
+
+ /**
+ * Wrapper class around State enum to make its operations atomic.
+ */
+ private class AtomicStateEnum {
+ private final AtomicReference<State> ref;
+
+ public AtomicStateEnum(final State initialValue) {
+ this.ref = new AtomicReference<State>(initialValue);
+ }
+
+ public void set(final State newValue) {
+ this.ref.set(newValue);
+ }
+
+ public State get() {
+ return this.ref.get();
+ }
+
+ public State getAndSet(final State newValue) {
+ return this.ref.getAndSet(newValue);
+ }
+
+ public boolean compareAndSet(final State expect, final State update) {
+ return this.ref.compareAndSet(expect, update);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/be420098/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 940a1bf..f007261 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader;
import org.apache.hadoop.hive.metastore.HiveAlterHandler;
import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask;
+import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockCleanerTask;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.events.EventCleanerTask;
import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
@@ -728,7 +729,8 @@ public class MetastoreConf {
TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always",
EventCleanerTask.class.getName() + "," +
"org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," +
- MaterializationsCacheCleanerTask.class.getName(),
+ MaterializationsCacheCleanerTask.class.getName() + "," +
+ MaterializationsRebuildLockCleanerTask.class.getName(),
"Comma separated list of tasks that will be started in separate threads. These will " +
"always be started, regardless of whether the metastore is running in embedded mode " +
"or in server mode. They must implement " + MetastoreTaskThread.class.getName()),