You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2019/02/19 07:06:04 UTC

[hive] branch master updated: Hive replication to a target with hive.strict.managed.tables enabled is failing when used HMS on postgres. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

This is an automated email from the ASF dual-hosted git repository.

mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 104fa19  Hive replication to a target with hive.strict.managed.tables enabled is failing when used HMS on postgres. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
104fa19 is described below

commit 104fa19a4a45aaaf7fae929c92e99977d1796a51
Author: Mahesh Kumar Behera <ma...@apache.org>
AuthorDate: Tue Feb 19 12:20:31 2019 +0530

    Hive replication to a target with hive.strict.managed.tables enabled is failing when used HMS on postgres. (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
---
 .../hadoop/hive/ql/parse/WarehouseInstance.java    | 26 +++++++++++++++++++
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java    |  1 +
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |  2 ++
 .../hadoop/hive/ql/parse/ReplicationSpec.java      |  7 +++++
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 30 +++++++++++-----------
 5 files changed, 51 insertions(+), 15 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index bd3a557..c0d416c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -155,6 +155,32 @@ public class WarehouseInstance implements Closeable {
 
     MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true);
 
+    // Add the below mentioned dependency in metastore/pom.xml file. For postgres need to copy postgresql-42.2.1.jar to
+    // .m2//repository/postgresql/postgresql/9.3-1102.jdbc41/postgresql-9.3-1102.jdbc41.jar.
+    /*
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>8.0.15</version>
+    </dependency>
+
+    <dependency>
+      <groupId>postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+      <version>9.3-1102.jdbc41</version>
+    </dependency>
+    */
+
+    /*hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:mysql://localhost:3306/APP");
+    hiveConf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "com.mysql.jdbc.Driver");
+    hiveConf.setVar(HiveConf.ConfVars.METASTOREPWD, "hivepassword");
+    hiveConf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "hiveuser");*/
+
+    /*hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY,"jdbc:postgresql://localhost/app");
+    hiveConf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER, "org.postgresql.Driver");
+    hiveConf.setVar(HiveConf.ConfVars.METASTOREPWD, "password");
+    hiveConf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME, "postgres");*/
+
     driver = DriverFactory.newDriver(hiveConf);
     SessionState.start(new CliSessionState(hiveConf));
     client = new HiveMetaStoreClient(hiveConf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index cb7fdf7..b02cdf8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4712,6 +4712,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       if (existingTable != null){
         if (crtTbl.getReplicationSpec().allowEventReplacementInto(existingTable.getParameters())){
           crtTbl.setReplaceMode(true); // we replace existing table.
+          ReplicationSpec.copyLastReplId(existingTable.getParameters(), tbl.getParameters());
         } else {
           LOG.debug("DDLTask: Create Table is skipped as table {} is newer than update",
                   crtTbl.getTableName());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 5cfd0a8..7343eed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -132,6 +132,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -2972,6 +2973,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
             org.apache.hadoop.hive.metastore.api.Partition ptn =
                 getMSC().getPartition(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), p.getValues());
             if (addPartitionDesc.getReplicationSpec().allowReplacementInto(ptn.getParameters())){
+              ReplicationSpec.copyLastReplId(ptn.getParameters(), p.getParameters());
               partsToAlter.add(p);
             } // else ptn already exists, but we do nothing with it.
           } catch (NoSuchObjectException nsoe){
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index b087831..d55ee20 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -419,4 +419,11 @@ public class ReplicationSpec {
   public void setMigratingToExternalTable() {
     isMigratingToExternalTable = true;
   }
+
+  public static void copyLastReplId(Map<String, String> srcParameter, Map<String, String> destParameter) {
+    String lastReplId = srcParameter.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+    if (lastReplId != null) {
+      destParameter.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), lastReplId);
+    }
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 6df7680..fd85af9 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -897,13 +897,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
       if (needUpdateDBReplId) {
         // not used select for update as it will be updated by single thread only from repl load
-        rs = stmt.executeQuery("select PARAM_VALUE from DATABASE_PARAMS where PARAM_KEY = " +
-                "'repl.last.id' and DB_ID = " + dbId);
+        rs = stmt.executeQuery("select \"PARAM_VALUE\" from \"DATABASE_PARAMS\" where \"PARAM_KEY\" = " +
+                "'repl.last.id' and \"DB_ID\" = " + dbId);
         if (!rs.next()) {
-          query = "insert into DATABASE_PARAMS values ( " + dbId + " , 'repl.last.id' , ? )";
+          query = "insert into \"DATABASE_PARAMS\" values ( " + dbId + " , 'repl.last.id' , ? )";
         } else {
-          query = "update DATABASE_PARAMS set PARAM_VALUE = ? where DB_ID = " + dbId +
-                  " and PARAM_KEY = 'repl.last.id'";
+          query = "update \"DATABASE_PARAMS\" set \"PARAM_VALUE\" = ? where \"DB_ID\" = " + dbId +
+                  " and \"PARAM_KEY\" = 'repl.last.id'";
         }
         close(rs);
         params = Arrays.asList(lastReplId);
@@ -935,13 +935,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       pst.close();
 
       // select for update is not required as only one task will update this during repl load.
-      rs = stmt.executeQuery("select PARAM_VALUE from TABLE_PARAMS where PARAM_KEY = " +
-              "'repl.last.id' and TBL_ID = " + tblId);
+      rs = stmt.executeQuery("select \"PARAM_VALUE\" from \"TABLE_PARAMS\" where \"PARAM_KEY\" = " +
+              "'repl.last.id' and \"TBL_ID\" = " + tblId);
       if (!rs.next()) {
-        query = "insert into TABLE_PARAMS values ( " + tblId + " , 'repl.last.id' , ? )";
+        query = "insert into \"TABLE_PARAMS\" values ( " + tblId + " , 'repl.last.id' , ? )";
       } else {
-        query = "update TABLE_PARAMS set PARAM_VALUE = ? where TBL_ID = " + tblId +
-                " and PARAM_KEY = 'repl.last.id'";
+        query = "update \"TABLE_PARAMS\" set \"PARAM_VALUE\" = ? where \"TBL_ID\" = " + tblId +
+                " and \"PARAM_KEY\" = 'repl.last.id'";
       }
       rs.close();
 
@@ -988,13 +988,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         prs = pst.executeQuery();
         while (prs.next()) {
           long partId = prs.getLong(1);
-          rs = stmt.executeQuery("select PARAM_VALUE from PARTITION_PARAMS where PARAM_KEY " +
-                  " = 'repl.last.id' and PART_ID = " + partId);
+          rs = stmt.executeQuery("select \"PARAM_VALUE\" from \"PARTITION_PARAMS\" where \"PARAM_KEY\" " +
+                  " = 'repl.last.id' and \"PART_ID\" = " + partId);
           if (!rs.next()) {
-            query = "insert into PARTITION_PARAMS values ( " + partId + " , 'repl.last.id' , ? )";
+            query = "insert into \"PARTITION_PARAMS\" values ( " + partId + " , 'repl.last.id' , ? )";
           } else {
-            query = "update PARTITION_PARAMS set PARAM_VALUE = ? " +
-                    " where PART_ID = " + partId + " and PARAM_KEY = 'repl.last.id'";
+            query = "update \"PARTITION_PARAMS\" set \"PARAM_VALUE\" = ? " +
+                    " where \"PART_ID\" = " + partId + " and \"PARAM_KEY\" = 'repl.last.id'";
           }
           rs.close();