You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/03 17:47:47 UTC
[19/46] hive git commit: HIVE-19267: Replicate ACID/MM tables write
operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index f7018c2..ac1d3c8 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -59,6 +59,7 @@ public class ReplChangeManager {
static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash";
private static final String URI_FRAGMENT_SEPARATOR = "#";
public static final String SOURCE_OF_REPLICATION = "repl.source.for";
+ private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]";
public enum RecycleType {
MOVE,
@@ -472,7 +473,6 @@ public class ReplChangeManager {
}
public static boolean isSourceOfReplication(Database db) {
- // Can not judge, so assuming replication is not enabled.
assert (db != null);
String replPolicyIds = getReplPolicyIdString(db);
return !StringUtils.isEmpty(replPolicyIds);
@@ -490,4 +490,12 @@ public class ReplChangeManager {
}
return null;
}
+
+ public static String joinWithSeparator(Iterable<?> strings) {
+ return org.apache.hadoop.util.StringUtils.join(TXN_WRITE_EVENT_FILE_SEPARATOR, strings);
+ }
+
+ public static String[] getListFromSeparatedString(String commaSeparatedString) {
+ return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*");
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index a526019..8ff056f 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMMapping;
import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
@@ -2414,6 +2415,17 @@ public class CachedStore implements RawStore, Configurable {
return sharedCache.getUpdateCount();
}
+ @Override
+ public void cleanWriteNotificationEvents(int olderThan) {
+ rawStore.cleanWriteNotificationEvents(olderThan);
+ }
+
+
+ @Override
+ public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+ return rawStore.getAllWriteEventInfo(txnId, dbName, tableName);
+ }
+
static boolean isNotInBlackList(String catName, String dbName, String tblName) {
String str = TableName.getQualified(catName, dbName, tblName);
for (Pattern pattern : blacklistPatterns) {
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java
new file mode 100644
index 0000000..001179a
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+
+import java.util.List;
+
+/**
+ * AcidWriteEvent
+ * Event generated for acid write operations
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class AcidWriteEvent extends ListenerEvent {
+ private final WriteNotificationLogRequest writeNotificationLogRequest;
+ private final String partition;
+ private final Table tableObj;
+ private final Partition partitionObj;
+
+ public AcidWriteEvent(String partition, Table tableObj, Partition partitionObj,
+ WriteNotificationLogRequest writeNotificationLogRequest) {
+ super(true, null);
+ this.writeNotificationLogRequest = writeNotificationLogRequest;
+ this.partition = partition;
+ this.tableObj = tableObj;
+ this.partitionObj = partitionObj;
+ }
+
+ public Long getTxnId() {
+ return writeNotificationLogRequest.getTxnId();
+ }
+
+ public List<String> getFiles() {
+ return writeNotificationLogRequest.getFileInfo().getFilesAdded();
+ }
+
+ public List<String> getChecksums() {
+ return writeNotificationLogRequest.getFileInfo().getFilesAddedChecksum();
+ }
+
+ public String getDatabase() {
+ return StringUtils.normalizeIdentifier(writeNotificationLogRequest.getDb());
+ }
+
+ public String getTable() {
+ return StringUtils.normalizeIdentifier(writeNotificationLogRequest.getTable());
+ }
+
+ public String getPartition() {
+ return partition; //Don't normalize partition value, as its case sensitive.
+ }
+
+ public Long getWriteId() {
+ return writeNotificationLogRequest.getWriteId();
+ }
+
+ public Table getTableObj() {
+ return tableObj;
+ }
+
+ public Partition getPartitionObj() {
+ return partitionObj;
+ }
+
+ public List<String> getSubDirs() {
+ return writeNotificationLogRequest.getFileInfo().getSubDirectoryList();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java
new file mode 100644
index 0000000..e2c9ccf
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java
@@ -0,0 +1,50 @@
+/* * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import java.util.List;
+
+/**
+ * HCat message sent when an ACID write is done.
+ */
+public abstract class AcidWriteMessage extends EventMessage {
+
+ protected AcidWriteMessage() {
+ super(EventType.ACID_WRITE);
+ }
+
+ public abstract Long getTxnId();
+
+ public abstract String getTable();
+
+ public abstract Long getWriteId();
+
+ public abstract String getPartition();
+
+ public abstract List<String> getFiles();
+
+ public abstract Table getTableObj() throws Exception;
+
+ public abstract Partition getPartitionObj() throws Exception;
+
+ public abstract String getTableObjStr();
+
+ public abstract String getPartitionObjStr();
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
index 49004f2..9733039 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
@@ -17,6 +17,12 @@
package org.apache.hadoop.hive.metastore.messaging;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
+
+import java.util.List;
+
/**
* HCat message sent when an commit transaction is done.
*/
@@ -33,4 +39,21 @@ public abstract class CommitTxnMessage extends EventMessage {
*/
public abstract Long getTxnId();
+ public abstract List<Long> getWriteIds();
+
+ public abstract List<String> getDatabases();
+
+ public abstract List<String> getTables();
+
+ public abstract List<String> getPartitions();
+
+ public abstract Table getTableObj(int idx) throws Exception;
+
+ public abstract Partition getPartitionObj(int idx) throws Exception;
+
+ public abstract String getFiles(int idx);
+
+ public abstract List<String> getFilesList();
+
+ public abstract void addWriteEventInfo(List<WriteEventInfo> writeEventInfoList);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
index 969dd7b..f24b419 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
@@ -60,7 +60,8 @@ public abstract class EventMessage {
COMMIT_TXN(MessageFactory.COMMIT_TXN_EVENT),
ABORT_TXN(MessageFactory.ABORT_TXN_EVENT),
ALLOC_WRITE_ID(MessageFactory.ALLOC_WRITE_ID_EVENT),
- ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT);
+ ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT),
+ ACID_WRITE(MessageFactory.ACID_WRITE_EVENT);
private String typeString;
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
index ca33579..b701d84 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
@@ -70,6 +70,10 @@ public abstract class MessageDeserializer {
return getCommitTxnMessage(messageBody);
case ABORT_TXN:
return getAbortTxnMessage(messageBody);
+ case ALLOC_WRITE_ID:
+ return getAllocWriteIdMessage(messageBody);
+ case ACID_WRITE:
+ return getAcidWriteMessage(messageBody);
default:
throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString);
}
@@ -186,6 +190,11 @@ public abstract class MessageDeserializer {
*/
public abstract AllocWriteIdMessage getAllocWriteIdMessage(String messageBody);
+ /*
+ * Method to de-serialize AcidWriteMessage instance.
+ */
+ public abstract AcidWriteMessage getAcidWriteMessage(String messageBody);
+
// Protection against construction.
protected MessageDeserializer() {}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index e0629ea..d529147 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import java.util.Iterator;
@@ -74,6 +75,7 @@ public abstract class MessageFactory {
public static final String ABORT_TXN_EVENT = "ABORT_TXN";
public static final String ALLOC_WRITE_ID_EVENT = "ALLOC_WRITE_ID_EVENT";
public static final String ALTER_CATALOG_EVENT = "ALTER_CATALOG";
+ public static final String ACID_WRITE_EVENT = "ACID_WRITE_EVENT";
private static MessageFactory instance = null;
@@ -326,4 +328,14 @@ public abstract class MessageFactory {
public abstract DropCatalogMessage buildDropCatalogMessage(Catalog catalog);
public abstract AlterCatalogMessage buildAlterCatalogMessage(Catalog oldCat, Catalog newCat);
+
+ /**
+ * Factory method for building acid write message
+ *
+ *
+ * @param acidWriteEvent information related to the acid write operation
+ * @param files files added by this write operation
+ * @return instance of AcidWriteMessage
+ */
+ public abstract AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
new file mode 100644
index 0000000..515a2cb
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging.json;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
+import org.apache.thrift.TException;
+import org.codehaus.jackson.annotate.JsonProperty;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * JSON implementation of AcidWriteMessage
+ */
+public class JSONAcidWriteMessage extends AcidWriteMessage {
+
+ @JsonProperty
+ private Long txnid, writeId, timestamp;
+
+ @JsonProperty
+ private String server, servicePrincipal, database, table, partition, tableObjJson, partitionObjJson;
+
+ @JsonProperty
+ private List<String> files;
+
+ /**
+ * Default constructor, needed for Jackson.
+ */
+ public JSONAcidWriteMessage() {
+ }
+
+ public JSONAcidWriteMessage(String server, String servicePrincipal, Long timestamp, AcidWriteEvent acidWriteEvent,
+ Iterator<String> files) {
+ this.timestamp = timestamp;
+ this.txnid = acidWriteEvent.getTxnId();
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.database = acidWriteEvent.getDatabase();
+ this.table = acidWriteEvent.getTable();
+ this.writeId = acidWriteEvent.getWriteId();
+ this.partition = acidWriteEvent.getPartition();
+ try {
+ this.tableObjJson = JSONMessageFactory.createTableObjJson(acidWriteEvent.getTableObj());
+ if (acidWriteEvent.getPartitionObj() != null) {
+ this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(acidWriteEvent.getPartitionObj());
+ } else {
+ this.partitionObjJson = null;
+ }
+ } catch (TException e) {
+ throw new IllegalArgumentException("Could not serialize JSONAcidWriteMessage : ", e);
+ }
+ this.files = Lists.newArrayList(files);
+ }
+
+ @Override
+ public Long getTxnId() {
+ return txnid;
+ }
+
+ @Override
+ public Long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public String getDB() {
+ return database;
+ }
+
+ @Override
+ public String getServicePrincipal() {
+ return servicePrincipal;
+ }
+
+ @Override
+ public String getServer() {
+ return server;
+ }
+
+ @Override
+ public String getTable() {
+ return table;
+ }
+
+ @Override
+ public Long getWriteId() {
+ return writeId;
+ }
+
+ @Override
+ public String getPartition() {
+ return partition;
+ }
+
+ @Override
+ public List<String> getFiles() {
+ return files;
+ }
+
+ @Override
+ public Table getTableObj() throws Exception {
+ return (tableObjJson == null) ? null : (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class);
+ }
+
+ @Override
+ public Partition getPartitionObj() throws Exception {
+ return ((partitionObjJson == null) ? null :
+ (Partition) JSONMessageFactory.getTObj(partitionObjJson, Partition.class));
+ }
+
+ @Override
+ public String getTableObjStr() {
+ return tableObjJson;
+ }
+
+ @Override
+ public String getPartitionObjStr() {
+ return partitionObjJson;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ } catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
index 595a3d1..6082b8e 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
@@ -18,9 +18,15 @@
*/
package org.apache.hadoop.hive.metastore.messaging.json;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
import org.codehaus.jackson.annotate.JsonProperty;
+import java.util.List;
+
/**
* JSON implementation of CommitTxnMessage
*/
@@ -38,6 +44,12 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
@JsonProperty
private String servicePrincipal;
+ @JsonProperty
+ private List<Long> writeIds;
+
+ @JsonProperty
+ private List<String> databases, tables, partitions, tableObjs, partitionObjs, files;
+
/**
* Default constructor, needed for Jackson.
*/
@@ -49,6 +61,13 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
this.txnid = txnid;
this.server = server;
this.servicePrincipal = servicePrincipal;
+ this.databases = null;
+ this.tables = null;
+ this.writeIds = null;
+ this.partitions = null;
+ this.tableObjs = null;
+ this.partitionObjs = null;
+ this.files = null;
}
@Override
@@ -77,6 +96,82 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
}
@Override
+ public List<Long> getWriteIds() {
+ return writeIds;
+ }
+
+ @Override
+ public List<String> getDatabases() {
+ return databases;
+ }
+
+ @Override
+ public List<String> getTables() {
+ return tables;
+ }
+
+ @Override
+ public List<String> getPartitions() {
+ return partitions;
+ }
+
+ @Override
+ public Table getTableObj(int idx) throws Exception {
+ return tableObjs == null ? null : (Table) JSONMessageFactory.getTObj(tableObjs.get(idx), Table.class);
+ }
+
+ @Override
+ public Partition getPartitionObj(int idx) throws Exception {
+ return (partitionObjs == null ? null : (partitionObjs.get(idx) == null ? null :
+ (Partition)JSONMessageFactory.getTObj(partitionObjs.get(idx), Partition.class)));
+ }
+
+ @Override
+ public String getFiles(int idx) {
+ return files == null ? null : files.get(idx);
+ }
+
+ @Override
+ public List<String> getFilesList() {
+ return files;
+ }
+
+ @Override
+ public void addWriteEventInfo(List<WriteEventInfo> writeEventInfoList) {
+ if (this.databases == null) {
+ this.databases = Lists.newArrayList();
+ }
+ if (this.tables == null) {
+ this.tables = Lists.newArrayList();
+ }
+ if (this.writeIds == null) {
+ this.writeIds = Lists.newArrayList();
+ }
+ if (this.tableObjs == null) {
+ this.tableObjs = Lists.newArrayList();
+ }
+ if (this.partitions == null) {
+ this.partitions = Lists.newArrayList();
+ }
+ if (this.partitionObjs == null) {
+ this.partitionObjs = Lists.newArrayList();
+ }
+ if (this.files == null) {
+ this.files = Lists.newArrayList();
+ }
+
+ for (WriteEventInfo writeEventInfo : writeEventInfoList) {
+ this.databases.add(writeEventInfo.getDatabase());
+ this.tables.add(writeEventInfo.getTable());
+ this.writeIds.add(writeEventInfo.getWriteId());
+ this.partitions.add(writeEventInfo.getPartition());
+ this.tableObjs.add(writeEventInfo.getTableObj());
+ this.partitionObjs.add(writeEventInfo.getPartitionObj());
+ this.files.add(writeEventInfo.getFiles());
+ }
+ }
+
+ @Override
public String toString() {
try {
return JSONMessageDeserializer.mapper.writeValueAsString(this);
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
index f54e24d..be6b751 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
import org.codehaus.jackson.map.DeserializationConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
@@ -259,4 +260,12 @@ public class JSONMessageDeserializer extends MessageDeserializer {
throw new IllegalArgumentException("Could not construct AllocWriteIdMessage", e);
}
}
+
+ public AcidWriteMessage getAcidWriteMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONAcidWriteMessage.class);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Could not construct AcidWriteMessage", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index d64c3ff..07f51f0 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
@@ -230,11 +232,17 @@ public class JSONMessageFactory extends MessageFactory {
return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
}
+ @Override
public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList,
String dbName, String tableName) {
return new JSONAllocWriteIdMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnToWriteIdList, dbName, tableName, now());
}
+ @Override
+ public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files) {
+ return new JSONAcidWriteMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), acidWriteEvent, files);
+ }
+
private long now() {
return System.currentTimeMillis() / 1000;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java
new file mode 100644
index 0000000..f5ca386
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.model;
+
+/**
+ * MTxnWriteNotificationLog
+ * DN table for ACID write events.
+ */
+public class MTxnWriteNotificationLog {
+ private long txnId;
+ private long writeId;
+ private int eventTime;
+ private String database;
+ private String table;
+ private String partition;
+ private String tableObject;
+ private String partObject;
+ private String files;
+
+ public MTxnWriteNotificationLog() {
+ }
+
+ public MTxnWriteNotificationLog(long txnId, long writeId, int eventTime, String database, String table,
+ String partition, String tableObject, String partObject, String files) {
+ this.txnId = txnId;
+ this.writeId = writeId;
+ this.eventTime = eventTime;
+ this.database = database;
+ this.table = table;
+ this.partition = partition;
+ this.tableObject = tableObject;
+ this.partObject = partObject;
+ this.files = files;
+ }
+
+ public long getTxnId() {
+ return txnId;
+ }
+
+ public void setTxnId(long txnId) {
+ this.txnId = txnId;
+ }
+
+ public long getWriteId() {
+ return writeId;
+ }
+
+ public void setWriteId(long writeId) {
+ this.writeId = writeId;
+ }
+
+ public int getEventTime() {
+ return eventTime;
+ }
+
+ public void setEventTime(int eventTime) {
+ this.eventTime = eventTime;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public String getPartition() {
+ return partition;
+ }
+
+ public void setPartition(String partition) {
+ this.partition = partition;
+ }
+
+ public String getTableObject() {
+ return tableObject;
+ }
+
+ public void setTableObject(String tableObject) {
+ this.tableObject = tableObject;
+ }
+
+ public String getPartObject() {
+ return partObject;
+ }
+
+ public void setPartObject(String partObject) {
+ this.partObject = partObject;
+ }
+
+ public String getFiles() {
+ return files;
+ }
+
+ public void setFiles(String files) {
+ this.files = files;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
index b23a6d7..d0ac7db 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -175,4 +175,13 @@ public final class SQLGenerator {
return dbProduct;
}
+ // This is required for SQL executed directly. If the SQL has double quotes then some dbs tend to
+ // remove the escape characters and store the variable without double quote.
+ public String addEscapeCharacters(String s) {
+ if (dbProduct == DatabaseProduct.MYSQL) {
+ return s.replaceAll("\\\\", "\\\\\\\\");
+ }
+ return s;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 50bfca3..f8c2ca2 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -245,6 +245,34 @@ public final class TxnDbUtil {
stmt.execute("INSERT INTO \"APP\".\"NOTIFICATION_SEQUENCE\" (\"NNI_ID\", \"NEXT_EVENT_ID\")" +
" SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( SELECT " +
"\"NEXT_EVENT_ID\" FROM \"APP\".\"NOTIFICATION_SEQUENCE\")");
+
+ try {
+ stmt.execute("CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (" +
+ "WNL_ID bigint NOT NULL," +
+ "WNL_TXNID bigint NOT NULL," +
+ "WNL_WRITEID bigint NOT NULL," +
+ "WNL_DATABASE varchar(128) NOT NULL," +
+ "WNL_TABLE varchar(128) NOT NULL," +
+ "WNL_PARTITION varchar(1024) NOT NULL," +
+ "WNL_TABLE_OBJ clob NOT NULL," +
+ "WNL_PARTITION_OBJ clob," +
+ "WNL_FILES clob," +
+ "WNL_EVENT_TIME integer NOT NULL," +
+ "PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION))"
+ );
+ } catch (SQLException e) {
+ if (e.getMessage() != null && e.getMessage().contains("already exists")) {
+ LOG.info("TXN_WRITE_NOTIFICATION_LOG table already exist, ignoring");
+ } else {
+ throw e;
+ }
+ }
+
+ stmt.execute("INSERT INTO \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") " +
+ "SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', " +
+ "1)) tmp_table WHERE NOT EXISTS ( SELECT \"NEXT_VAL\" FROM \"APP\"" +
+ ".\"SEQUENCE_TABLE\" WHERE \"SEQUENCE_NAME\" = 'org.apache.hadoop.hive.metastore" +
+ ".model.MTxnWriteNotificationLog')");
} catch (SQLException e) {
try {
conn.rollback();
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 361ede5..3785f89 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -698,6 +699,38 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
@Override
@RetrySemantics.Idempotent
+ public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException {
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ try {
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), stmt);
+ if (targetTxnIds.isEmpty()) {
+ LOG.info("Txn {} not present for repl policy {}", sourceTxnId, replPolicy);
+ return -1;
+ }
+ assert (targetTxnIds.size() == 1);
+ return targetTxnIds.get(0);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "getTargetTxnId(" + replPolicy + sourceTxnId + ")");
+ throw new MetaException("Unable to get target transaction id "
+ + StringUtils.stringifyException(e));
+ } finally {
+ close(null, stmt, dbConn);
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ return getTargetTxnId(replPolicy, sourceTxnId);
+ }
+ }
+
+ @Override
+ @RetrySemantics.Idempotent
public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException {
long txnid = rqst.getTxnid();
long sourceTxnId = -1;
@@ -892,10 +925,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
shouldNeverHappen(txnid);
//dbConn is rolled back in finally{}
}
- String conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" +
- quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")";
- rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix));
- if (rs.next()) {
+
+ String conflictSQLSuffix = null;
+ if (rqst.isSetReplPolicy()) {
+ rs = null;
+ } else {
+ conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" +
+ quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")";
+ rs = stmt.executeQuery(sqlGenerator.addLimitClause(1,
+ "tc_operation_type " + conflictSQLSuffix));
+ }
+ if (rs != null && rs.next()) {
isUpdateDelete = true;
close(rs);
//if here it means currently committing txn performed update/delete and we should check WW conflict
@@ -984,23 +1024,52 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's
* commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn.
* If RO < W, then there is no reads-from relationship.
+ * In replication flow we don't expect any write write conflict as it should have been handled at source.
*/
}
- // Move the record from txn_components into completed_txn_components so that the compactor
- // knows where to look to compact.
- String s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " +
- "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " +
- "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid;
- LOG.debug("Going to execute insert <" + s + ">");
- int modCount = 0;
- if ((modCount = stmt.executeUpdate(s)) < 1) {
- //this can be reasonable for an empty txn START/COMMIT or read-only txn
- //also an IUD with DP that didn't match any rows.
- LOG.info("Expected to move at least one record from txn_components to " +
- "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
+
+ String s;
+ if (!rqst.isSetReplPolicy()) {
+ // Move the record from txn_components into completed_txn_components so that the compactor
+ // knows where to look to compact.
+ s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " +
+ "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " +
+ "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid;
+ LOG.debug("Going to execute insert <" + s + ">");
+
+ if ((stmt.executeUpdate(s)) < 1) {
+ //this can be reasonable for an empty txn START/COMMIT or read-only txn
+ //also an IUD with DP that didn't match any rows.
+ LOG.info("Expected to move at least one record from txn_components to " +
+ "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
+ }
+ } else {
+ if (rqst.isSetWriteEventInfos()) {
+ List<String> rows = new ArrayList<>();
+ for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
+ rows.add(txnid + "," + quoteString(writeEventInfo.getDatabase()) + "," +
+ quoteString(writeEventInfo.getTable()) + "," +
+ quoteString(writeEventInfo.getPartition()) + "," +
+ writeEventInfo.getWriteId());
+ }
+ List<String> queries = sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " +
+ "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid)", rows);
+ for (String q : queries) {
+ LOG.debug("Going to execute insert <" + q + "> ");
+ stmt.execute(q);
+ }
+ }
+
+ s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
+ " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
+ LOG.info("Repl going to execute <" + s + ">");
+ stmt.executeUpdate(s);
}
+
// Obtain information that we need to update registry
- s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid;
+ s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS" +
+ " where ctc_txnid = " + txnid;
+
LOG.debug("Going to extract table modification information for invalidation cache <" + s + ">");
rs = stmt.executeQuery(s);
while (rs.next()) {
@@ -1008,27 +1077,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
txnComponents.add(new TransactionRegistryInfo(rs.getString(1), rs.getString(2),
rs.getLong(3), rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()));
}
+
+ // cleanup all txn related metadata
s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
LOG.debug("Going to execute update <" + s + ">");
- modCount = stmt.executeUpdate(s);
+ stmt.executeUpdate(s);
s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
LOG.debug("Going to execute update <" + s + ">");
- modCount = stmt.executeUpdate(s);
+ stmt.executeUpdate(s);
s = "delete from TXNS where txn_id = " + txnid;
LOG.debug("Going to execute update <" + s + ">");
- modCount = stmt.executeUpdate(s);
+ stmt.executeUpdate(s);
s = "delete from MIN_HISTORY_LEVEL where mhl_txnid = " + txnid;
LOG.debug("Going to execute update <" + s + ">");
- modCount = stmt.executeUpdate(s);
+ stmt.executeUpdate(s);
LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL");
-
- if (rqst.isSetReplPolicy()) {
- s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
- " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
- LOG.info("Repl going to execute <" + s + ">");
- stmt.executeUpdate(s);
- }
-
if (transactionalListeners != null) {
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator);
@@ -1548,6 +1611,43 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
@Override
+ @RetrySemantics.Idempotent
+ public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent)
+ throws MetaException {
+ Connection dbConn = null;
+ try {
+ try {
+ //Idempotent case is handled by notify Event
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+ EventMessage.EventType.ACID_WRITE, acidWriteEvent, dbConn, sqlGenerator);
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ return;
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ if (isDuplicateKeyError(e)) {
+ // in case of key duplicate error, retry as it might be because of race condition
+ if (waitForRetry("addWriteNotificationLog(" + acidWriteEvent + ")", e.getMessage())) {
+ throw new RetryException();
+ }
+ retryNum = 0;
+ throw new MetaException(e.getMessage());
+ }
+ checkRetryable(dbConn, e, "addWriteNotificationLog(" + acidWriteEvent + ")");
+ throw new MetaException("Unable to add write notification event " + StringUtils.stringifyException(e));
+ } finally{
+ closeDbConn(dbConn);
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ addWriteNotificationLog(acidWriteEvent);
+ }
+ }
+
+ @Override
@RetrySemantics.SafeToRetry
public void performWriteSetGC() {
Connection dbConn = null;
@@ -3046,6 +3146,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
closeStmt(stmt);
closeDbConn(dbConn);
}
+
+ private boolean waitForRetry(String caller, String errMsg) {
+ if (retryNum++ < retryLimit) {
+ LOG.warn("Retryable error detected in " + caller + ". Will wait " + retryInterval +
+ "ms and retry up to " + (retryLimit - retryNum + 1) + " times. Error: " + errMsg);
+ try {
+ Thread.sleep(retryInterval);
+ } catch (InterruptedException ex) {
+ //
+ }
+ return true;
+ } else {
+ LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + errMsg);
+ }
+ return false;
+ }
/**
* Determine if an exception was such that it makes sense to retry. Unfortunately there is no standard way to do
* this, so we have to inspect the error messages and catch the telltale signs for each
@@ -3089,18 +3205,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
} else if (isRetryable(conf, e)) {
//in MSSQL this means Communication Link Failure
- if (retryNum++ < retryLimit) {
- LOG.warn("Retryable error detected in " + caller + ". Will wait " + retryInterval +
- "ms and retry up to " + (retryLimit - retryNum + 1) + " times. Error: " + getMessage(e));
- try {
- Thread.sleep(retryInterval);
- } catch (InterruptedException ex) {
- //
- }
- sendRetrySignal = true;
- } else {
- LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
- }
+ sendRetrySignal = waitForRetry(caller, e.getMessage());
}
else {
//make sure we know we saw an error that we don't recognize
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index ef447e1..d972d10 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.classification.RetrySemantics;
import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import java.sql.SQLException;
import java.util.Iterator;
@@ -86,6 +87,9 @@ public interface TxnStore extends Configurable {
@RetrySemantics.Idempotent
OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
+ @RetrySemantics.Idempotent
+ long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException;
+
/**
* Abort (rollback) a transaction.
* @param rqst info on transaction to abort
@@ -476,4 +480,11 @@ public interface TxnStore extends Configurable {
*/
@RetrySemantics.Idempotent
void setHadoopJobId(String hadoopJobId, long id);
+
+ /**
+ * Add the ACID write event information to writeNotificationLog table.
+ * @param acidWriteEvent
+ */
+ @RetrySemantics.Idempotent
+ void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
index 963e12f..154db4b 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
@@ -511,7 +511,6 @@ public class FileUtils {
return new Path(scheme, authority, pathUri.getPath());
}
-
/**
* Returns a BEST GUESS as to whether or not other is a subdirectory of parent. It does not
* take into account any intricacies of the underlying file system, which is assumed to be
@@ -524,4 +523,15 @@ public class FileUtils {
public static boolean isSubdirectory(String parent, String other) {
return other.startsWith(parent.endsWith(Path.SEPARATOR) ? parent : parent + Path.SEPARATOR);
}
+
+ public static Path getTransformedPath(String name, String subDir, String root) {
+ if (root != null) {
+ Path newPath = new Path(root);
+ if (subDir != null) {
+ newPath = new Path(newPath, subDir);
+ }
+ return new Path(newPath, name);
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/resources/package.jdo
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/resources/package.jdo b/standalone-metastore/src/main/resources/package.jdo
index 1be3e98..5fb548c 100644
--- a/standalone-metastore/src/main/resources/package.jdo
+++ b/standalone-metastore/src/main/resources/package.jdo
@@ -1182,6 +1182,41 @@
</field>
</class>
+ <class name="MTxnWriteNotificationLog" table="TXN_WRITE_NOTIFICATION_LOG" identity-type="datastore" detachable="true">
+ <datastore-identity strategy="increment"/>
+ <datastore-identity key-cache-size="1"/>
+ <datastore-identity>
+ <column name="WNL_ID"/>
+ </datastore-identity>
+ <field name="txnId">
+ <column name="WNL_TXNID" jdbc-type="BIGINT" allows-null="false"/>
+ </field>
+ <field name="writeId">
+ <column name="WNL_WRITEID" jdbc-type="BIGINT" allows-null="false"/>
+ </field>
+ <field name="database">
+ <column name="WNL_DATABASE" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+ </field>
+ <field name="table">
+ <column name="WNL_TABLE" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+ </field>
+ <field name="partition">
+ <column name="WNL_PARTITION" length="1024" jdbc-type="VARCHAR" allows-null="false"/>
+ </field>
+ <field name="tableObject">
+ <column name="WNL_TABLE_OBJ" jdbc-type="LONGVARCHAR"/>
+ </field>
+ <field name="partObject">
+ <column name="WNL_PARTITION_OBJ" jdbc-type="LONGVARCHAR"/>
+ </field>
+ <field name="files">
+ <column name="WNL_FILES" jdbc-type="LONGVARCHAR"/>
+ </field>
+ <field name="eventTime">
+ <column name="WNL_EVENT_TIME" jdbc-type="INTEGER" allows-null="false"/>
+ </field>
+ </class>
+
<class name="MWMResourcePlan" identity-type="datastore" table="WM_RESOURCEPLAN" detachable="true">
<datastore-identity>
<column name="RP_ID"/>
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql
index 352b43e..a696d06 100644
--- a/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql
@@ -689,6 +689,21 @@ CREATE TABLE "APP"."RUNTIME_STATS" (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID bigint NOT NULL,
+ WNL_TXNID bigint NOT NULL,
+ WNL_WRITEID bigint NOT NULL,
+ WNL_DATABASE varchar(128) NOT NULL,
+ WNL_TABLE varchar(128) NOT NULL,
+ WNL_PARTITION varchar(1024) NOT NULL,
+ WNL_TABLE_OBJ clob NOT NULL,
+ WNL_PARTITION_OBJ clob,
+ WNL_FILES clob,
+ WNL_EVENT_TIME integer NOT NULL,
+ PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index bb69105..7cab4fb 100644
--- a/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -689,6 +689,21 @@ CREATE TABLE "APP"."RUNTIME_STATS" (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID bigint NOT NULL,
+ WNL_TXNID bigint NOT NULL,
+ WNL_WRITEID bigint NOT NULL,
+ WNL_DATABASE varchar(128) NOT NULL,
+ WNL_TABLE varchar(128) NOT NULL,
+ WNL_PARTITION varchar(1024) NOT NULL,
+ WNL_TABLE_OBJ clob NOT NULL,
+ WNL_PARTITION_OBJ clob,
+ WNL_FILES clob,
+ WNL_EVENT_TIME integer NOT NULL,
+ PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
index 7b7a8a2..10f1373 100644
--- a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
@@ -244,7 +244,6 @@ CREATE TABLE MIN_HISTORY_LEVEL (
CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
-
CREATE TABLE "APP"."RUNTIME_STATS" (
"RS_ID" bigint primary key,
"CREATE_TIME" integer not null,
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql
index 6621ef7..7058ab0 100644
--- a/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql
@@ -29,5 +29,21 @@ ALTER TABLE TXNS ADD COLUMN TXN_TYPE integer;
CREATE INDEX "APP"."TAB_COL_STATS_IDX" ON "APP"."TAB_COL_STATS" ("CAT_NAME", "DB_NAME", "TABLE_NAME", "COLUMN_NAME");
+-- HIVE-19267
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID bigint NOT NULL,
+ WNL_TXNID bigint NOT NULL,
+ WNL_WRITEID bigint NOT NULL,
+ WNL_DATABASE varchar(128) NOT NULL,
+ WNL_TABLE varchar(128) NOT NULL,
+ WNL_PARTITION varchar(1024) NOT NULL,
+ WNL_TABLE_OBJ clob NOT NULL,
+ WNL_PARTITION_OBJ clob,
+ WNL_FILES clob,
+ WNL_EVENT_TIME integer NOT NULL,
+ PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- This needs to be the last thing done. Insert any changes above this line.
UPDATE "APP".VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1;
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql
index bc11b40..d7722dc 100644
--- a/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql
+++ b/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql
@@ -1248,6 +1248,23 @@ CREATE TABLE RUNTIME_STATS (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID bigint NOT NULL,
+ WNL_TXNID bigint NOT NULL,
+ WNL_WRITEID bigint NOT NULL,
+ WNL_DATABASE nvarchar(128) NOT NULL,
+ WNL_TABLE nvarchar(128) NOT NULL,
+ WNL_PARTITION nvarchar(1024) NOT NULL,
+ WNL_TABLE_OBJ text NOT NULL,
+ WNL_PARTITION_OBJ text,
+ WNL_FILES text,
+ WNL_EVENT_TIME int NOT NULL
+);
+
+ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG_PK PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION);
+
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 922e8fe..a81fc40 100644
--- a/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1249,6 +1249,23 @@ CREATE TABLE RUNTIME_STATS (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID bigint NOT NULL,
+ WNL_TXNID bigint NOT NULL,
+ WNL_WRITEID bigint NOT NULL,
+ WNL_DATABASE nvarchar(128) NOT NULL,
+ WNL_TABLE nvarchar(128) NOT NULL,
+ WNL_PARTITION nvarchar(1024) NOT NULL,
+ WNL_TABLE_OBJ text NOT NULL,
+ WNL_PARTITION_OBJ text,
+ WNL_FILES text,
+ WNL_EVENT_TIME int NOT NULL
+);
+
+ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG_PK PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION);
+
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql
index abb80d6..41f23f7 100644
--- a/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql
+++ b/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql
@@ -30,6 +30,22 @@ ALTER TABLE TXNS ADD TXN_TYPE int NULL;
CREATE INDEX TAB_COL_STATS_IDX ON TAB_COL_STATS (CAT_NAME, DB_NAME, TABLE_NAME, COLUMN_NAME);
+-- HIVE-19267
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID bigint NOT NULL,
+ WNL_TXNID bigint NOT NULL,
+ WNL_WRITEID bigint NOT NULL,
+ WNL_DATABASE nvarchar(128) NOT NULL,
+ WNL_TABLE nvarchar(128) NOT NULL,
+ WNL_PARTITION nvarchar(1024) NOT NULL,
+ WNL_TABLE_OBJ text NOT NULL,
+ WNL_PARTITION_OBJ text,
+ WNL_FILES text,
+ WNL_EVENT_TIME int NOT NULL
+);
+ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG_PK PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0' AS MESSAGE;
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
index c54df55..c65af1e 100644
--- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
@@ -1155,7 +1155,6 @@ CREATE TABLE REPL_TXN_MAP (
PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
-
CREATE TABLE RUNTIME_STATS (
RS_ID bigint primary key,
CREATE_TIME bigint NOT NULL,
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql
index af955dc..29d4a43 100644
--- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql
@@ -1173,6 +1173,22 @@ CREATE TABLE RUNTIME_STATS (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID bigint NOT NULL,
+ WNL_TXNID bigint NOT NULL,
+ WNL_WRITEID bigint NOT NULL,
+ WNL_DATABASE varchar(128) NOT NULL,
+ WNL_TABLE varchar(128) NOT NULL,
+ WNL_PARTITION varchar(1024) NOT NULL,
+ WNL_TABLE_OBJ longtext NOT NULL,
+ WNL_PARTITION_OBJ longtext,
+ WNL_FILES longtext,
+ WNL_EVENT_TIME INT(11) NOT NULL,
+ PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+INSERT INTO `SEQUENCE_TABLE` (`SEQUENCE_NAME`, `NEXT_VAL`) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 6c40e6e..968f4a4 100644
--- a/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1173,6 +1173,22 @@ CREATE TABLE RUNTIME_STATS (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID bigint NOT NULL,
+ WNL_TXNID bigint NOT NULL,
+ WNL_WRITEID bigint NOT NULL,
+ WNL_DATABASE varchar(128) NOT NULL,
+ WNL_TABLE varchar(128) NOT NULL,
+ WNL_PARTITION varchar(1024) NOT NULL,
+ WNL_TABLE_OBJ longtext NOT NULL,
+ WNL_PARTITION_OBJ longtext,
+ WNL_FILES longtext,
+ WNL_EVENT_TIME INT(11) NOT NULL,
+ PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+INSERT INTO `SEQUENCE_TABLE` (`SEQUENCE_NAME`, `NEXT_VAL`) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
index 9b87563..786e38a 100644
--- a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
@@ -319,8 +319,8 @@ UPDATE COMPLETED_TXN_COMPONENTS SET CTC_WRITEID = CTC_TXNID;
ALTER TABLE TXN_COMPONENTS MODIFY COLUMN TC_TABLE varchar(128) NULL;
+ALTER TABLE `TBLS` ADD COLUMN `OWNER_TYPE` VARCHAR(10) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 2.3.0 to 3.0.0' AS ' ';
-
-ALTER TABLE `TBLS` ADD COLUMN `OWNER_TYPE` VARCHAR(10) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql
index 305fa1d..e103bef 100644
--- a/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql
@@ -30,6 +30,22 @@ ALTER TABLE TXNS ADD COLUMN TXN_TYPE int DEFAULT NULL;
CREATE INDEX TAB_COL_STATS_IDX ON TAB_COL_STATS (CAT_NAME, DB_NAME, TABLE_NAME, COLUMN_NAME) USING BTREE;
+-- HIVE-19267
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID bigint NOT NULL,
+ WNL_TXNID bigint NOT NULL,
+ WNL_WRITEID bigint NOT NULL,
+ WNL_DATABASE varchar(128) NOT NULL,
+ WNL_TABLE varchar(128) NOT NULL,
+ WNL_PARTITION varchar(1024) NOT NULL,
+ WNL_TABLE_OBJ longtext NOT NULL,
+ WNL_PARTITION_OBJ longtext,
+ WNL_FILES longtext,
+ WNL_EVENT_TIME INT(11) NOT NULL,
+ PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+INSERT INTO `SEQUENCE_TABLE` (`SEQUENCE_NAME`, `NEXT_VAL`) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0' AS ' ';
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
index 63cc1f7..3e2e282 100644
--- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
@@ -1134,7 +1134,6 @@ CREATE TABLE RUNTIME_STATS (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
-
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql
index bc13703..9adea31 100644
--- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql
@@ -1143,6 +1143,21 @@ CREATE TABLE RUNTIME_STATS (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID number(19) NOT NULL,
+ WNL_TXNID number(19) NOT NULL,
+ WNL_WRITEID number(19) NOT NULL,
+ WNL_DATABASE varchar(128) NOT NULL,
+ WNL_TABLE varchar(128) NOT NULL,
+ WNL_PARTITION varchar(1024) NOT NULL,
+ WNL_TABLE_OBJ clob NOT NULL,
+ WNL_PARTITION_OBJ clob,
+ WNL_FILES clob,
+ WNL_EVENT_TIME number(10) NOT NULL,
+ PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index e12150a..faca669 100644
--- a/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1143,6 +1143,21 @@ CREATE TABLE RUNTIME_STATS (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID number(19) NOT NULL,
+ WNL_TXNID number(19) NOT NULL,
+ WNL_WRITEID number(19) NOT NULL,
+ WNL_DATABASE varchar(128) NOT NULL,
+ WNL_TABLE varchar(128) NOT NULL,
+ WNL_PARTITION varchar(1024) NOT NULL,
+ WNL_TABLE_OBJ clob NOT NULL,
+ WNL_PARTITION_OBJ clob,
+ WNL_FILES clob,
+ WNL_EVENT_TIME number(10) NOT NULL,
+ PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
index ce3437f..71f5034 100644
--- a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
@@ -335,8 +335,8 @@ INSERT INTO TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID, T2W_WRITEID)
UPDATE TXN_COMPONENTS SET TC_WRITEID = TC_TXNID;
UPDATE COMPLETED_TXN_COMPONENTS SET CTC_WRITEID = CTC_TXNID;
+ALTER TABLE TBLS ADD OWNER_TYPE VARCHAR2(10) NULL;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 2.3.0 to 3.0.0' AS Status from dual;
-
-ALTER TABLE TBLS ADD OWNER_TYPE VARCHAR2(10) NULL;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql
index ccdea54..cf8699b 100644
--- a/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql
@@ -30,6 +30,22 @@ ALTER TABLE TXNS ADD TXN_TYPE number(10) NULL;
CREATE INDEX TAB_COL_STATS_IDX ON TAB_COL_STATS (CAT_NAME, DB_NAME, TABLE_NAME, COLUMN_NAME);
+-- HIVE-19267
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+ WNL_ID number(19) NOT NULL,
+ WNL_TXNID number(19) NOT NULL,
+ WNL_WRITEID number(19) NOT NULL,
+ WNL_DATABASE varchar(128) NOT NULL,
+ WNL_TABLE varchar(128) NOT NULL,
+ WNL_PARTITION varchar(1024) NOT NULL,
+ WNL_TABLE_OBJ clob NOT NULL,
+ WNL_PARTITION_OBJ clob,
+ WNL_FILES clob,
+ WNL_EVENT_TIME number(10) NOT NULL,
+ PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0' AS Status from dual;
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
index 97697f8..b89c87f 100644
--- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
@@ -1812,7 +1812,6 @@ CREATE TABLE REPL_TXN_MAP (
PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID)
);
-
CREATE TABLE RUNTIME_STATS (
RS_ID bigint primary key,
CREATE_TIME bigint NOT NULL,
@@ -1822,7 +1821,6 @@ CREATE TABLE RUNTIME_STATS (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
-
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
-- -----------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql
index 36bab70..7a8a419 100644
--- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql
@@ -1834,6 +1834,21 @@ CREATE TABLE RUNTIME_STATS (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+CREATE TABLE "TXN_WRITE_NOTIFICATION_LOG" (
+ "WNL_ID" bigint NOT NULL,
+ "WNL_TXNID" bigint NOT NULL,
+ "WNL_WRITEID" bigint NOT NULL,
+ "WNL_DATABASE" varchar(128) NOT NULL,
+ "WNL_TABLE" varchar(128) NOT NULL,
+ "WNL_PARTITION" varchar(1024) NOT NULL,
+ "WNL_TABLE_OBJ" text NOT NULL,
+ "WNL_PARTITION_OBJ" text,
+ "WNL_FILES" text,
+ "WNL_EVENT_TIME" integer NOT NULL,
+ PRIMARY KEY ("WNL_TXNID", "WNL_DATABASE", "WNL_TABLE", "WNL_PARTITION")
+);
+
+INSERT INTO "SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index b73e1d1..2e7ac5a 100644
--- a/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1836,6 +1836,21 @@ CREATE TABLE RUNTIME_STATS (
CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
+CREATE TABLE "TXN_WRITE_NOTIFICATION_LOG" (
+ "WNL_ID" bigint NOT NULL,
+ "WNL_TXNID" bigint NOT NULL,
+ "WNL_WRITEID" bigint NOT NULL,
+ "WNL_DATABASE" varchar(128) NOT NULL,
+ "WNL_TABLE" varchar(128) NOT NULL,
+ "WNL_PARTITION" varchar(1024) NOT NULL,
+ "WNL_TABLE_OBJ" text NOT NULL,
+ "WNL_PARTITION_OBJ" text,
+ "WNL_FILES" text,
+ "WNL_EVENT_TIME" integer NOT NULL,
+ PRIMARY KEY ("WNL_TXNID", "WNL_DATABASE", "WNL_TABLE", "WNL_PARTITION")
+);
+
+INSERT INTO "SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
-- -----------------------------------------------------------------
-- Record schema version. Should be the last step in the init script
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql
index 2c0eb31..445c3a2 100644
--- a/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql
@@ -32,6 +32,22 @@ ALTER TABLE TXNS ADD COLUMN TXN_TYPE integer DEFAULT NULL;
CREATE INDEX "TAB_COL_STATS_IDX" ON "TAB_COL_STATS" USING btree ("CAT_NAME", "DB_NAME","TABLE_NAME","COLUMN_NAME");
+-- HIVE-19267
+CREATE TABLE "TXN_WRITE_NOTIFICATION_LOG" (
+ "WNL_ID" bigint NOT NULL,
+ "WNL_TXNID" bigint NOT NULL,
+ "WNL_WRITEID" bigint NOT NULL,
+ "WNL_DATABASE" varchar(128) NOT NULL,
+ "WNL_TABLE" varchar(128) NOT NULL,
+ "WNL_PARTITION" varchar(1024) NOT NULL,
+ "WNL_TABLE_OBJ" text NOT NULL,
+ "WNL_PARTITION_OBJ" text,
+ "WNL_FILES" text,
+ "WNL_EVENT_TIME" integer NOT NULL,
+ PRIMARY KEY ("WNL_TXNID", "WNL_DATABASE", "WNL_TABLE", "WNL_PARTITION")
+);
+INSERT INTO "SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
-- These lines need to be last. Insert any changes above.
UPDATE "VERSION" SET "SCHEMA_VERSION"='3.1.0', "VERSION_COMMENT"='Hive release version 3.1.0' where "VER_ID"=1;
SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0';
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/thrift/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift
index 6e503eb..1ca6454 100644
--- a/standalone-metastore/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift
@@ -867,6 +867,18 @@ struct AbortTxnsRequest {
struct CommitTxnRequest {
1: required i64 txnid,
2: optional string replPolicy,
+ // Information related to write operations done in this transaction.
+ 3: optional list<WriteEventInfo> writeEventInfos,
+}
+
+struct WriteEventInfo {
+ 1: required i64 writeId,
+ 2: required string database,
+ 3: required string table,
+ 4: required string files,
+ 5: optional string partition,
+ 6: optional string tableObj, // repl txn task does not need table object for commit
+ 7: optional string partitionObj,
}
struct ReplTblWriteIdStateRequest {
@@ -1102,6 +1114,8 @@ struct InsertEventRequestData {
2: required list<string> filesAdded,
// Checksum of files (hex string of checksum byte payload)
3: optional list<string> filesAddedChecksum,
+ // Used by acid operation to create the sub directory
+ 4: optional list<string> subDirectoryList,
}
union FireEventRequestData {
@@ -1122,7 +1136,20 @@ struct FireEventRequest {
struct FireEventResponse {
// NOP for now, this is just a place holder for future responses
}
-
+
+struct WriteNotificationLogRequest {
+ 1: required i64 txnId,
+ 2: required i64 writeId,
+ 3: required string db,
+ 4: required string table,
+ 5: required InsertEventRequestData fileInfo,
+ 6: optional list<string> partitionVals,
+}
+
+struct WriteNotificationLogResponse {
+ // NOP for now, this is just a place holder for future responses
+}
+
struct MetadataPpdResult {
1: optional binary metadata,
2: optional binary includeBitset
@@ -2104,6 +2131,7 @@ service ThriftHiveMetastore extends fb303.FacebookService
NotificationEventsCountResponse get_notification_events_count(1:NotificationEventsCountRequest rqst)
FireEventResponse fire_listener_event(1:FireEventRequest rqst)
void flushCache()
+ WriteNotificationLogResponse add_write_notification_log(WriteNotificationLogRequest rqst)
// Repl Change Management api
CmRecycleResponse cm_recycle(1:CmRecycleRequest request) throws(1:MetaException o1)
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index c482d28..2454479 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.WMMapping;
import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
import org.apache.thrift.TException;
@@ -1195,6 +1196,16 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
}
@Override
+ public void cleanWriteNotificationEvents(int olderThan) {
+ objectStore.cleanWriteNotificationEvents(olderThan);
+ }
+
+ @Override
+ public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+ return objectStore.getAllWriteEventInfo(txnId, dbName, tableName);
+ }
+
+ @Override
public List<TableName> getTableNamesWithStats() throws MetaException,
NoSuchObjectException {
return null;
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index d253005..9b79446 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.WMMapping;
import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -1199,4 +1200,13 @@ public class DummyRawStoreForJdoConnection implements RawStore {
NoSuchObjectException {
return null;
}
+
+ @Override
+ public void cleanWriteNotificationEvents(int olderThan) {
+ }
+
+ @Override
+ public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+ return null;
+ }
}