You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/08 16:13:16 UTC

[GitHub] [hudi] lw309637554 opened a new pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

lw309637554 opened a new pull request #1810:
URL: https://github.com/apache/hudi/pull/1810


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   Abstract hudi-sync-common ,and migrate the hudi-hive-sync to hudi-sync-hive. hudi-sync-hive implement the hudi-sync-common .
   
   Then will support hudi-sync-aliyun-dla  implement the hudi-sync-common .
   
   This is the RFC https://cwiki.apache.org/confluence/display/HUDI/RFC+-+17+Abstract+common+meta+sync+module+support+multiple+meta+service.
   
   And the old PR is https://github.com/apache/hudi/pull/1716 ,which just abstract the hudi-sync-common.
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465449363



##########
File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) {
+    this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+    this.tableType = metaClient.getTableType();
+    this.basePath = basePath;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+    this.fs = fs;
+    this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+                                   String inputFormatClass, String outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option<String> getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+  public abstract Map<String, String> getTableSchema(String tableName);
+
+  public HoodieTimeline getActiveTimeline() {
+    return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+    return tableType;
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+    try {
+      if (stmt != null) {
+        stmt.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the statement opened ", e);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465495815



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##########
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
     return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs) {
     if (config.isMetricsOn()) {
       Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs));
-      Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+    }
+  }
+
+  public void updateDeltaStreamerMetaSyncMetrics(String syncClassName, long syncNs) {

Review comment:
       ok ,i will do it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-662830316


   @lw309637554 can you please give me a couple days. I am trying to prioritize all the 0.6.0 blockers for now. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465450971



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -475,12 +480,38 @@ private String startCommit() {
     throw lastException;
   }
 
-  /**
-   * Sync to Hive.
-   */
-  public void syncHiveIfNeeded() {
+  private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
+    String syncClientToolClass = cfg.syncClientToolClass;
+    // for backward compatibility
     if (cfg.enableHiveSync) {
-      syncHive();
+      cfg.enableMetaSync = true;
+      syncClientToolClass = String.format("%s,%s", cfg.syncClientToolClass, "org.apache.hudi.hive.HiveSyncTool");

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465434113



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -267,9 +267,16 @@ public Operation convert(String value) throws ParameterException {
         description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
     public Boolean filterDupes = false;
 
+    //will abandon in the future version, recommended use --enable-sync
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
+    public Boolean enableMetaSync = false;
+
+    @Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
+    public String syncClientToolClass = "org.apache.hudi.hive.HiveSyncTool";

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452157405



##########
File path: hudi-sync/hudi-hive-sync/pom.xml
##########
@@ -43,6 +45,11 @@
       <artifactId>hudi-hadoop-mr</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-sync-common</artifactId>

Review comment:
       Thanks, I thank put hudi-sync-common base class to hudi-common is make sense. but hudi-sync-hive 、hudi-sync-dla under hudi/     directory  will make hudi code so much moudle .  
   I think another choice to  put hudi-sync-common base class to hudi-common , and hudi-sync-hive 、hudi-sync-dla  etc.. under hudi-sync.
   what about yours suggestion @vinothchandar @leesf ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-656086937


   > @lw309637554 Thanks for your contribution! Overall looks good.
   > Open to discuss where to put these modules. I vote to put the base class to `hudi-common` and have separate modules for different query engines.
   
   thanks so much for your very valuable suggestion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r455483211



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS
+    }
+    var metaSyncSuccess = true
+    if (metaSyncEnabled) {
+      val impls = syncClientToolClass.split(",")
+      impls.foreach(impl => {
+        val syncSuccess = impl.trim match {
+          case DEFAULT_SYNC_CLIENT_TOOL_CLASS => {
+            log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+            val fs = FSUtils.getFs(basePath.toString, hadoopConf)
+            syncHive(basePath, fs, parameters)
+          }
+          case _ => {
+            val fs = FSUtils.getFs(basePath.toString, hadoopConf)
+            val properties = new Properties();
+            properties.putAll(parameters)
+            properties.put("basePath", basePath.toString)
+            val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool]
+            syncHoodie.syncHoodieTable()

Review comment:
       Is there any concern that not using this way to `syncHive()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-667553349


   > @lw309637554 would you please rebase and fix the conflicts.
   
   okay, done


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-668974757


   @lw309637554 is this ready for a final review? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r453190766



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -237,6 +237,9 @@ public Operation convert(String value) throws ParameterException {
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools")

Review comment:
       ok , i  can add a new --enable-sync  as the default choice , and also support --enable-hive-sync  for a duplicate parameter.  what do you think about ? @garyli1019 @leesf 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r455531592



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS
+    }
+    var metaSyncSuccess = true
+    if (metaSyncEnabled) {
+      val impls = syncClientToolClass.split(",")
+      impls.foreach(impl => {
+        val syncSuccess = impl.trim match {
+          case DEFAULT_SYNC_CLIENT_TOOL_CLASS => {
+            log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+            val fs = FSUtils.getFs(basePath.toString, hadoopConf)
+            syncHive(basePath, fs, parameters)
+          }
+          case _ => {
+            val fs = FSUtils.getFs(basePath.toString, hadoopConf)
+            val properties = new Properties();
+            properties.putAll(parameters)
+            properties.put("basePath", basePath.toString)
+            val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool]
+            syncHoodie.syncHoodieTable()

Review comment:
       because HoodieHiveClient  Constructor   is different like this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-656092089


   @vinothchandar  I  meet some mistakes, just opened a new PR.  
   In the new PR, I fix the build break. This is my thinks about backwards compatible .
   a.  about users code compatible: because do not modify the hivesync class name. Users do not need to modify these old code.
   b. about users  pom dependency compatible: because do not modify the module name hudi-hive-sync. Users not need to modify pom dependency
   c. about users local jar files compatible: if users's local jars is do not shaded the hudi-hive-sync's 
   Indirect dependence. Old users just need add the hudi-sync-common.jar to their directory. Just like hudi-utilities-bundle modiry in this PR.
   Reply: we can do this to put the hudi-sync-common base class to hudi-common
   d. about sync parameters, just add new parameters. Compatible is ok.
   
   what about your suggestion?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-667530374


   @lw309637554 would you please rebase and fix the conflicts.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452160822



##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java
##########
@@ -20,12 +20,14 @@
 
 import org.apache.parquet.schema.MessageType;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringJoiner;
+import java.util.ArrayList;

Review comment:
       okay

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -63,10 +67,11 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Objects;
+import java.util.Properties;

Review comment:
       thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-659182664


   > Hi @lw309637554 , I totally understand the importance of the backward capability. IMO, that will be great if we can remove the hive dependency from `hudi-spark` and `hudi-utilities`. If we treat `syncHive` separately, we still need to include some Hive related packages in these two modules.
   > I had this dependency issue before while I was testing the delta streamer. I didn't use Hive at all but I need to resolve some Hive dependency conflicts in my production environment. So I'd incline to sacrifice some backward capability and move all the Hive related packages to `hudi-hive-sync`. Do you think this is possible?
   > Happy to hear what you guys think.
   
   this is a good point. I think remove hive dependency from hudi-spark and  hudi-utilities is a another   work . we can open another issue resolve it 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-659747098


   > > this is a good point. I think remove hive dependency from hudi-spark and hudi-utilities is a another work . we can open another issue resolve it
   > 
   > sounds good. follow up ticket: https://issues.apache.org/jira/browse/HUDI-1101
   
   very nice


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-661736470


   @vinothchandar The pr is ready overall. Can you help to review ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-669248590


   > @lw309637554 this seems ready?
   
   yes, but the test failed for some reason, i rerun it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-659601445


   > this is a good point. I think remove hive dependency from hudi-spark and hudi-utilities is a another work . we can open another issue resolve it
   
   sounds good. follow up ticket: https://issues.apache.org/jira/browse/HUDI-1101
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452153918



##########
File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) {
+    this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+    this.tableType = metaClient.getTableType();
+    this.basePath = basePath;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+    this.fs = fs;
+    this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+                                   String inputFormatClass, String outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option<String> getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+  public HoodieTimeline getActiveTimeline() {
+    return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+    return tableType;
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+    try {
+      if (stmt != null) {
+        stmt.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the statement opened ", e);
+    }
+
+    try {
+      if (resultSet != null) {
+        resultSet.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the resultset opened ", e);
+    }
+  }
+
+  /**
+   * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if
+   * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has
+   * not changed within a single atomic write.
+   *
+   * @return Parquet schema for this table
+   */
+  public MessageType getDataSchema() {
+    try {
+      return new TableSchemaResolver(metaClient).getTableParquetSchema();
+    } catch (Exception e) {
+      throw new HoodieSyncException("Failed to read data schema", e);
+    }
+  }
+
+  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+  public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
+    if (!lastCommitTimeSynced.isPresent()) {
+      LOG.info("Last commit time synced is not known, listing all partitions in " + basePath + ",FS :" + fs);
+      try {
+        return FSUtils.getAllPartitionPaths(fs, basePath, assumeDatePartitioning);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to list all partitions in " + basePath, e);
+      }
+    } else {
+      LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
+
+      HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
+      return timelineToSync.getInstants().map(s -> {
+        try {
+          return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e);
+        }
+      }).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList());
+    }
+  }
+
+  private MessageType readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws IOException {

Review comment:
       A very good suggestion. I just use the readSchemaFromLastCompaction in old hudi-hive-sync module. I think i can replace the readSchemaFromLastCompaction to TableSchemaResolver. readSchemaFromLastCompaction()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465416217



##########
File path: hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.hive.PartitionValueExtractor;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieDLAClient extends AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(HoodieDLAClient.class);
+  private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync";
+  // Make sure we have the dla JDBC driver in classpath
+  private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
+  private static final String DLA_ESCAPE_CHARACTER = "";
+  private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES";
+
+  static {
+    try {
+      Class.forName(DRIVER_NAME);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e);
+    }
+  }
+
+  private Connection connection;
+  private DLASyncConfig dlaConfig;
+  private PartitionValueExtractor partitionValueExtractor;
+
+  public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
+    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, fs);
+    this.dlaConfig = syncConfig;
+    try {
+      this.partitionValueExtractor =
+          (PartitionValueExtractor) Class.forName(dlaConfig.partitionValueExtractorClass).newInstance();
+    } catch (Exception e) {
+      throw new HoodieException(
+          "Failed to initialize PartitionValueExtractor class " + dlaConfig.partitionValueExtractorClass, e);
+    }
+    createDLAConnection();
+  }
+
+  private void createDLAConnection() {
+    if (connection == null) {
+      try {
+        Class.forName(DRIVER_NAME);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Unable to load DLA driver class", e);
+        return;
+      }
+      try {
+        this.connection = DriverManager.getConnection(dlaConfig.jdbcUrl, dlaConfig.dlaUser, dlaConfig.dlaPass);
+        LOG.info("Successfully established DLA connection to  " + dlaConfig.jdbcUrl);
+      } catch (SQLException e) {
+        throw new HoodieException("Cannot create dla connection ", e);
+      }
+    }
+  }
+
+  @Override
+  public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
+    try {
+      String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(), inputFormatClass, outputFormatClass, serdeClass);
+      LOG.info("Creating table with " + createSQLQuery);
+      updateDLASQL(createSQLQuery);
+    } catch (IOException e) {
+      throw new HoodieException("Failed to create table " + tableName, e);
+    }
+  }
+
+  public Map<String, String> getTableSchema(String tableName) {
+    if (!doesTableExist(tableName)) {
+      throw new IllegalArgumentException(
+          "Failed to get schema for table " + tableName + " does not exist");
+    }
+    Map<String, String> schema = new HashMap<>();
+    ResultSet result = null;
+    try {
+      DatabaseMetaData databaseMetaData = connection.getMetaData();
+      result = databaseMetaData.getColumns(dlaConfig.databaseName, dlaConfig.databaseName, tableName, null);
+      while (result.next()) {
+        String columnName = result.getString(4);
+        String columnType = result.getString(6);
+        if ("DECIMAL".equals(columnType)) {
+          int columnSize = result.getInt("COLUMN_SIZE");
+          int decimalDigits = result.getInt("DECIMAL_DIGITS");
+          columnType += String.format("(%s,%s)", columnSize, decimalDigits);
+        }
+        schema.put(columnName, columnType);
+      }
+      return schema;
+    } catch (SQLException e) {
+      throw new HoodieException("Failed to get table schema for " + tableName, e);
+    } finally {
+      closeQuietly(result, null);
+    }
+  }
+
+  @Override
+  public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
+    if (partitionsToAdd.isEmpty()) {
+      LOG.info("No partitions to add for " + tableName);
+      return;
+    }
+    LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
+    String sql = constructAddPartitions(tableName, partitionsToAdd);
+    updateDLASQL(sql);
+  }
+
+  public String constructAddPartitions(String tableName, List<String> partitions) {
+    return constructDLAAddPartitions(tableName, partitions);
+  }
+
+  String generateAbsolutePathStr(Path path) {
+    String absolutePathStr = path.toString();
+    if (path.toUri().getScheme() == null) {
+      absolutePathStr = getDefaultFs() + absolutePathStr;
+    }
+    return absolutePathStr.endsWith("/") ? absolutePathStr : absolutePathStr + "/";
+  }
+
+  public List<String> constructChangePartitions(String tableName, List<String> partitions) {
+    List<String> changePartitions = new ArrayList<>();
+    String useDatabase = "USE " + DLA_ESCAPE_CHARACTER + dlaConfig.databaseName + DLA_ESCAPE_CHARACTER;
+    changePartitions.add(useDatabase);
+    String alterTable = "ALTER TABLE " + DLA_ESCAPE_CHARACTER + tableName + DLA_ESCAPE_CHARACTER;
+    for (String partition : partitions) {
+      String partitionClause = getPartitionClause(partition);
+      Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
+      String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
+      String changePartition =
+          alterTable + " ADD IF NOT EXISTS PARTITION (" + partitionClause + ") LOCATION '" + fullPartitionPathStr + "'";
+      changePartitions.add(changePartition);
+    }
+    return changePartitions;
+  }
+
+  /**
+   * Generate Hive Partition from partition values.
+   *
+   * @param partition Partition path
+   * @return
+   */
+  public String getPartitionClause(String partition) {
+    List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
+    ValidationUtils.checkArgument(dlaConfig.partitionFields.size() == partitionValues.size(),
+        "Partition key parts " + dlaConfig.partitionFields + " does not match with partition values " + partitionValues
+            + ". Check partition strategy. ");
+    List<String> partBuilder = new ArrayList<>();
+    for (int i = 0; i < dlaConfig.partitionFields.size(); i++) {
+      partBuilder.add(dlaConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
+    }
+    return partBuilder.stream().collect(Collectors.joining(","));
+  }
+
+  private String constructDLAAddPartitions(String tableName, List<String> partitions) {
+    StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
+    alterSQL.append(DLA_ESCAPE_CHARACTER).append(dlaConfig.databaseName)
+        .append(DLA_ESCAPE_CHARACTER).append(".").append(DLA_ESCAPE_CHARACTER)
+        .append(tableName).append(DLA_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
+    for (String partition : partitions) {
+      String partitionClause = getPartitionClause(partition);
+      Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
+      String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
+      alterSQL.append("  PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPathStr)
+          .append("' ");
+    }
+    return alterSQL.toString();
+  }
+
+  private void updateDLASQL(String sql) {
+    Statement stmt = null;
+    try {
+      stmt = connection.createStatement();
+      LOG.info("Executing SQL " + sql);
+      stmt.execute(sql);
+    } catch (SQLException e) {
+      throw new HoodieException("Failed in executing SQL " + sql, e);
+    } finally {
+      closeQuietly(null, stmt);
+    }
+  }
+
+  @Override
+  public boolean doesTableExist(String tableName) {
+    String sql = consutructShowCreateTableSQL(tableName);
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = connection.createStatement();
+      rs = stmt.executeQuery(sql);
+    } catch (SQLException e) {
+      return false;
+    } finally {
+      closeQuietly(rs, stmt);
+    }
+    return true;
+  }
+
+  @Override
+  public Option<String> getLastCommitTimeSynced(String tableName) {
+    String sql = consutructShowCreateTableSQL(tableName);
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = connection.createStatement();
+      rs = stmt.executeQuery(sql);
+      if (rs.next()) {
+        String table = rs.getString(2);
+        Map<String, String> attr = new HashMap<>();
+        int index = table.indexOf(TBL_PROPERTIES_STR);
+        if (index != -1) {
+          String sub = table.substring(index + TBL_PROPERTIES_STR.length());
+          sub = sub.replaceAll("\\(", "").replaceAll("\\)", "").replaceAll("'", "");
+          String[] str = sub.split(",");
+
+          for (int i = 0; i < str.length; i++) {
+            String key = str[i].split("=")[0].trim();
+            String value = str[i].split("=")[1].trim();
+            attr.put(key, value);
+          }
+        }
+        return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
+      }
+      return Option.empty();
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table", e);
+    } finally {
+      closeQuietly(rs, stmt);
+    }
+  }
+
+  @Override
+  public void updateLastCommitTimeSynced(String tableName) {
+    // dla do not support update tblproperties, so do nothing.

Review comment:
       please use // TODO here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r453190458



##########
File path: hudi-sync/hudi-hive-sync/pom.xml
##########
@@ -43,6 +45,11 @@
       <artifactId>hudi-hadoop-mr</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-sync-common</artifactId>

Review comment:
       thanks, be consistent with hudi-client will be better

##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
##########
@@ -272,6 +268,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat
   /**
    * Get the table schema.
    */
+  //???? overwrite

Review comment:
       done

##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java
##########
@@ -20,12 +20,14 @@
 
 import org.apache.parquet.schema.MessageType;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringJoiner;
+import java.util.ArrayList;

Review comment:
       done

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -63,10 +67,11 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Objects;
+import java.util.Properties;

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-667785088


   This is on my plate for this week.  


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465415868



##########
File path: hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.dla.util.Utils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Tool to sync a hoodie table with a dla table. Either use it as a api
+ * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the
+ * partitions incrementally (all the partitions modified since the last commit)
+ */
+@SuppressWarnings("WeakerAccess")
+public class DLASyncTool extends AbstractSyncTool {
+
+  private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
+  public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
+  public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
+
+  private final DLASyncConfig cfg;
+  private final HoodieDLAClient hoodieDLAClient;
+  private final String snapshotTableName;
+  private final Option<String> roTableTableName;
+
+  public DLASyncTool(Properties properties, FileSystem fs) {
+    super(properties, fs);
+    this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
+    this.cfg = Utils.propertiesToConfig(properties);
+    switch (hoodieDLAClient.getTableType()) {
+      case COPY_ON_WRITE:
+        this.snapshotTableName = cfg.tableName;
+        this.roTableTableName = Option.empty();
+        break;
+      case MERGE_ON_READ:
+        this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
+        this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
+            Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+        break;
+      default:
+        LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+        throw new InvalidTableException(hoodieDLAClient.getBasePath());
+    }
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    try {
+      switch (hoodieDLAClient.getTableType()) {
+        case COPY_ON_WRITE:
+          syncHoodieTable(snapshotTableName, false);
+          break;
+        case MERGE_ON_READ:
+          // sync a RO table for MOR
+          syncHoodieTable(roTableTableName.get(), false);
+          // sync a RT table for MOR
+          syncHoodieTable(snapshotTableName, true);
+          break;
+        default:
+          LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+          throw new InvalidTableException(hoodieDLAClient.getBasePath());
+      }
+    } catch (RuntimeException re) {
+      LOG.error("Got runtime exception when dla syncing", re);
+    } finally {
+      hoodieDLAClient.close();
+    }
+  }
+
+  private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
+    LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
+        + " of type " + hoodieDLAClient.getTableType());
+    // Check if the necessary table exists
+    boolean tableExists = hoodieDLAClient.doesTableExist(tableName);
+    // Get the parquet schema for this table looking at the latest commit
+    MessageType schema = hoodieDLAClient.getDataSchema();
+    // Sync schema if needed
+    syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
+
+    LOG.info("Schema sync complete. Syncing partitions for " + tableName);
+    // Get the last time we successfully synced partitions
+    Option<String> lastCommitTimeSynced = Option.empty();

Review comment:
       we would add a TODO here once DLA supports alter table properties.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf closed pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf closed pull request #1810:
URL: https://github.com/apache/hudi/pull/1810


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-668367172


   > IMO, that will be great if we can remove the hive dependency from hudi-spark and hudi-utilities 
   
   We can discuss on the JIRA. but this needs more thought. We want spark datasource write and deltastreamer to continue to sync to hive, when the write completes. So, its a necessary thing IMO


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r454974889



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS

Review comment:
       yes, this will back compatibility




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r454969135



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS

Review comment:
        if user sync both hive and dla meta, the dla meta would not get synced.?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465449064



##########
File path: hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.dla.util.Utils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Tool to sync a hoodie table with a dla table. Either use it as a api
+ * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the
+ * partitions incrementally (all the partitions modified since the last commit)
+ */
+@SuppressWarnings("WeakerAccess")
+public class DLASyncTool extends AbstractSyncTool {
+
+  private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
+  public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
+  public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
+
+  private final DLASyncConfig cfg;
+  private final HoodieDLAClient hoodieDLAClient;
+  private final String snapshotTableName;
+  private final Option<String> roTableTableName;
+
+  public DLASyncTool(Properties properties, FileSystem fs) {
+    super(properties, fs);
+    this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
+    this.cfg = Utils.propertiesToConfig(properties);
+    switch (hoodieDLAClient.getTableType()) {
+      case COPY_ON_WRITE:
+        this.snapshotTableName = cfg.tableName;
+        this.roTableTableName = Option.empty();
+        break;
+      case MERGE_ON_READ:
+        this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
+        this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
+            Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+        break;
+      default:
+        LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+        throw new InvalidTableException(hoodieDLAClient.getBasePath());
+    }
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    try {
+      switch (hoodieDLAClient.getTableType()) {
+        case COPY_ON_WRITE:
+          syncHoodieTable(snapshotTableName, false);
+          break;
+        case MERGE_ON_READ:
+          // sync a RO table for MOR
+          syncHoodieTable(roTableTableName.get(), false);
+          // sync a RT table for MOR
+          syncHoodieTable(snapshotTableName, true);
+          break;
+        default:
+          LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+          throw new InvalidTableException(hoodieDLAClient.getBasePath());
+      }
+    } catch (RuntimeException re) {
+      LOG.error("Got runtime exception when dla syncing", re);
+    } finally {
+      hoodieDLAClient.close();
+    }
+  }
+
+  private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
+    LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
+        + " of type " + hoodieDLAClient.getTableType());
+    // Check if the necessary table exists
+    boolean tableExists = hoodieDLAClient.doesTableExist(tableName);
+    // Get the parquet schema for this table looking at the latest commit
+    MessageType schema = hoodieDLAClient.getDataSchema();
+    // Sync schema if needed
+    syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
+
+    LOG.info("Schema sync complete. Syncing partitions for " + tableName);
+    // Get the last time we successfully synced partitions
+    Option<String> lastCommitTimeSynced = Option.empty();

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452824160



##########
File path: hudi-sync/hudi-hive-sync/pom.xml
##########
@@ -43,6 +45,11 @@
       <artifactId>hudi-hadoop-mr</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-sync-common</artifactId>

Review comment:
       hi, I am incline to put hudi-sync-common, hudi-sync-dla and hudi-sync-hive modules under hudi-sync, it will keep the structure same as the refactored hudi-client which is consisted of hudi-client-common, hudi-client-spark, hudi-client-flink, hudi-client-java modules.
   
   ```
   hudi-sync
   |--hudi-sync-common
   |--hudi-sync-hive
   |--hive-sync-dla
   
   
   hudi-client
   |--hudi-client-common
   |--hudi-client-java
   |--hudi-client-spark
   |--hudi-client-flink
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465450351



##########
File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) {
+    this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+    this.tableType = metaClient.getTableType();
+    this.basePath = basePath;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+    this.fs = fs;
+    this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+                                   String inputFormatClass, String outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option<String> getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+  public abstract Map<String, String> getTableSchema(String tableName);
+
+  public HoodieTimeline getActiveTimeline() {
+    return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+    return tableType;
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+    try {
+      if (stmt != null) {
+        stmt.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the statement opened ", e);
+    }
+
+    try {
+      if (resultSet != null) {
+        resultSet.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the resultset opened ", e);

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465433316



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,44 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r454853182



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -237,6 +237,9 @@ public Operation convert(String value) throws ParameterException {
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools")

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r453190335



##########
File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) {
+    this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+    this.tableType = metaClient.getTableType();
+    this.basePath = basePath;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+    this.fs = fs;
+    this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+                                   String inputFormatClass, String outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option<String> getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+  public HoodieTimeline getActiveTimeline() {
+    return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+    return tableType;
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+    try {
+      if (stmt != null) {
+        stmt.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the statement opened ", e);
+    }
+
+    try {
+      if (resultSet != null) {
+        resultSet.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the resultset opened ", e);
+    }
+  }
+
+  /**
+   * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if
+   * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has
+   * not changed within a single atomic write.
+   *
+   * @return Parquet schema for this table
+   */
+  public MessageType getDataSchema() {
+    try {
+      return new TableSchemaResolver(metaClient).getTableParquetSchema();
+    } catch (Exception e) {
+      throw new HoodieSyncException("Failed to read data schema", e);
+    }
+  }
+
+  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+  public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
+    if (!lastCommitTimeSynced.isPresent()) {
+      LOG.info("Last commit time synced is not known, listing all partitions in " + basePath + ",FS :" + fs);
+      try {
+        return FSUtils.getAllPartitionPaths(fs, basePath, assumeDatePartitioning);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to list all partitions in " + basePath, e);
+      }
+    } else {
+      LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
+
+      HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
+      return timelineToSync.getInstants().map(s -> {
+        try {
+          return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e);
+        }
+      }).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList());
+    }
+  }
+
+  private MessageType readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws IOException {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465458394



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -258,11 +258,14 @@ object DataSourceWriteOptions {
     */
   val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = "hoodie.datasource.write.streaming.ignore.failed.batch"
   val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true"
+  val SYNC_CLIENT_TOOL_CLASS = "hoodie.sync.client.tool.class"
+  val DEFAULT_SYNC_CLIENT_TOOL_CLASS = "org.apache.hudi.hive.HiveSyncTool"

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465503124



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##########
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
     return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs) {
     if (config.isMetricsOn()) {
       Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs));
-      Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+    }
+  }
+
+  public void updateDeltaStreamerMetaSyncMetrics(String syncClassName, long syncNs) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-658740024


   @vinothchandar @garyli1019  I think in this PR  the hudi-sync abstract is ready.  Expect your review. Thanks
   
   1. the module abstract is 
   hudi-sync
        hudi-hive-sync/
        hudi-dla-sync/
   
   2. about backwards compatible .
   a. about users code compatible: because do not modify the hivesync class name. Users do not need to modify these old code.
   b. about users pom dependency compatible: because do not modify the module name hudi-hive-sync. Users not need to modify pom dependency
   c. about users local jar files compatible: if users's local jars is do not shaded the hudi-hive-sync's
   Indirect dependence. Old users just need add the hudi-sync-common.jar to their directory. Just like hudi-utilities-bundle modiry in this PR.
   d. about sync parameters, just add new parameters such as --enable-sync as new default parameter,also backwards compatible   --enable-hive-sync.
   
   3. some others works ,such as update the doc. will do in other issues.
   
   what about your suggestion?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452827668



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -237,6 +237,9 @@ public Operation convert(String value) throws ParameterException {
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools")

Review comment:
       yes, `--enable-sync` is reasonable since hudi will not only supports sync to hive, but also other meta @garyli1019 . but as @lw309637554 pointed out, the compatible is a problem here, so the current solution is compromised solution. I am ok to rename to `--enable-sync` and document the break change here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r453852987



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -237,6 +237,9 @@ public Operation convert(String value) throws ParameterException {
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools")

Review comment:
       sounds good to me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465418366



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -258,11 +258,14 @@ object DataSourceWriteOptions {
     */
   val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = "hoodie.datasource.write.streaming.ignore.failed.batch"
   val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true"
+  val SYNC_CLIENT_TOOL_CLASS = "hoodie.sync.client.tool.class"
+  val DEFAULT_SYNC_CLIENT_TOOL_CLASS = "org.apache.hudi.hive.HiveSyncTool"

Review comment:
       use HiveSyncTool.class.getName?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465417053



##########
File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) {
+    this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+    this.tableType = metaClient.getTableType();
+    this.basePath = basePath;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+    this.fs = fs;
+    this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+                                   String inputFormatClass, String outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option<String> getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+  public abstract Map<String, String> getTableSchema(String tableName);
+
+  public HoodieTimeline getActiveTimeline() {
+    return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+    return tableType;
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+    try {
+      if (stmt != null) {
+        stmt.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the statement opened ", e);

Review comment:
       let change to warn




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-663442107


   > couple
   
   okay ,thanks 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r464805558



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -267,9 +267,16 @@ public Operation convert(String value) throws ParameterException {
         description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
     public Boolean filterDupes = false;
 
+    //will abandon in the future version, recommended use --enable-sync

Review comment:
       can we print a warning around this, so the user knows? 
   
   here's my take. we can change the code so that --enable-sync and --sync-tool-class-list are the main drivers out of which we derive a `Set<String>` denoting all the sync tool classes. if --enable-hive-sync is specified, then we simply add the hive sync tool class to this set.. rest of the code just syncs to all sync tools in this set. 
   
   this way, `--enable-hive-sync` will be just isolated to the initial command line parsing code. We can apply the same method to datasource as well, if you don't see issues  @lw309637554 @leesf wdyt? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS
+    }
+    var metaSyncSuccess = true
+    if (metaSyncEnabled) {
+      val impls = syncClientToolClass.split(",")
+      impls.foreach(impl => {
+        val syncSuccess = impl.trim match {
+          case DEFAULT_SYNC_CLIENT_TOOL_CLASS => {
+            log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+            val fs = FSUtils.getFs(basePath.toString, hadoopConf)
+            syncHive(basePath, fs, parameters)
+          }
+          case _ => {
+            val fs = FSUtils.getFs(basePath.toString, hadoopConf)

Review comment:
       can this line be shared . the `fs` initialization?

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS
+    }
+    var metaSyncSuccess = true
+    if (metaSyncEnabled) {
+      val impls = syncClientToolClass.split(",")
+      impls.foreach(impl => {
+        val syncSuccess = impl.trim match {
+          case DEFAULT_SYNC_CLIENT_TOOL_CLASS => {
+            log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")

Review comment:
       can we explicitly match for Hive instead of default? we may change the default for e.g and it would be an issue. 

##########
File path: packaging/hudi-hive-sync-bundle/pom.xml
##########
@@ -66,7 +66,8 @@
                 <includes>
                   <include>org.apache.hudi:hudi-common</include>
                   <include>org.apache.hudi:hudi-hadoop-mr</include>
-                  <include>org.apache.hudi:hudi-hive-sync</include>
+                  <include>org.apache.hudi:hudi-sync-common</include>
+		  <include>org.apache.hudi:hudi-hive-sync</include>

Review comment:
       nit:indent

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -442,7 +449,8 @@ private void refreshTimeline() throws IOException {
     long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0;
 
     // Send DeltaStreamer Metrics
-    metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
+    metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs, true);
+    metrics.updateDeltaStreamerMetrics(overallTimeMs, metaSyncTimeMs, false);

Review comment:
       is there a way to do this by iterating over the configured sync tool classes? i.e only do it when sync is configured?  

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##########
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
     return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs, long syncNs, boolean hiveSync) {
     if (config.isMetricsOn()) {
       Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs));
-      Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+      if (hiveSync) {
+        Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(syncNs));
+      } else {
+        Metrics.registerGauge(getMetricsName("deltastreamer", "metaSyncDuration"), getDurationInMs(syncNs));

Review comment:
       what if both hive and meta sync are off? we would still emit metrics for meta? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -237,6 +237,9 @@ public Operation convert(String value) throws ParameterException {
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools")

Review comment:
       I think we can name this little shorter. `--sync-tool-class-list` ? 

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS

Review comment:
       would users sync to both?  down the line it may make sense to provide support for syncing to multiple things. 
   
   but even here, if we just append the HiveSync class when `hiveSyncEnabled=true`, we can support syncing to both Hive and dla? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##########
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
     return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs, long syncNs, boolean hiveSync) {
     if (config.isMetricsOn()) {
       Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs));
-      Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+      if (hiveSync) {
+        Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(syncNs));
+      } else {
+        Metrics.registerGauge(getMetricsName("deltastreamer", "metaSyncDuration"), getDurationInMs(syncNs));

Review comment:
       should we derive the metric name from the sync tool class. i.e instead of `metaSyncDuration`, we do `dlaSyncDuration`?  that seems more usable and understandable 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452166076



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -237,6 +237,9 @@ public Operation convert(String value) throws ParameterException {
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools")

Review comment:
       This place is worth discussing. For compatible, we just add a --hoodie-sync-client-tool-class.
   But i think use change --enable-hive-sync to   --enable-sync reasonable , or just add a new parameter --enable-sync and compatible the --enable-hive-sync parameter for old users. cc @vinothchandar @leesf 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465417539



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -475,12 +480,38 @@ private String startCommit() {
     throw lastException;
   }
 
-  /**
-   * Sync to Hive.
-   */
-  public void syncHiveIfNeeded() {
+  private void syncMeta(HoodieDeltaStreamerMetrics metrics) {
+    String syncClientToolClass = cfg.syncClientToolClass;
+    // for backward compatibility
     if (cfg.enableHiveSync) {
-      syncHive();
+      cfg.enableMetaSync = true;
+      syncClientToolClass = String.format("%s,%s", cfg.syncClientToolClass, "org.apache.hudi.hive.HiveSyncTool");

Review comment:
       use HiveSyncTool.class.getName here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r454973961



##########
File path: hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.dla.util.Utils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Tool to sync a hoodie table with a dla table. Either use it as a api
+ * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the
+ * partitions incrementally (all the partitions modified since the last commit)
+ */
+@SuppressWarnings("WeakerAccess")
+public class DLASyncTool extends AbstractSyncTool {
+
+  private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
+  public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
+  public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
+
+  private final DLASyncConfig cfg;
+  private final HoodieDLAClient hoodieDLAClient;
+  private final String snapshotTableName;
+  private final Option<String> roTableTableName;
+
+  public DLASyncTool(Properties properties, FileSystem fs) {
+    super(properties, fs);
+    this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
+    this.cfg = Utils.propertiesToConfig(properties);
+    switch (hoodieDLAClient.getTableType()) {
+      case COPY_ON_WRITE:
+        this.snapshotTableName = cfg.tableName;
+        this.roTableTableName = Option.empty();
+        break;
+      case MERGE_ON_READ:
+        this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
+        this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
+            Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+        break;
+      default:
+        LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+        throw new InvalidTableException(hoodieDLAClient.getBasePath());
+    }
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    try {
+      switch (hoodieDLAClient.getTableType()) {
+        case COPY_ON_WRITE:
+          syncHoodieTable(snapshotTableName, false);
+          break;
+        case MERGE_ON_READ:
+          // sync a RO table for MOR
+          syncHoodieTable(roTableTableName.get(), false);
+          // sync a RT table for MOR
+          syncHoodieTable(snapshotTableName, true);
+          break;
+        default:
+          LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+          throw new InvalidTableException(hoodieDLAClient.getBasePath());
+      }
+    } catch (RuntimeException re) {
+      LOG.error("Got runtime exception when dla syncing", re);
+    } finally {
+      hoodieDLAClient.close();
+    }
+  }
+
+  private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
+    LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
+        + " of type " + hoodieDLAClient.getTableType());
+    // Check if the necessary table exists
+    boolean tableExists = hoodieDLAClient.doesTableExist(tableName);
+    // Get the parquet schema for this table looking at the latest commit
+    MessageType schema = hoodieDLAClient.getDataSchema();
+    // Sync schema if needed
+    syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
+
+    LOG.info("Schema sync complete. Syncing partitions for " + tableName);
+    // Get the last time we successfully synced partitions
+    Option<String> lastCommitTimeSynced = Option.empty();
+    /*if (tableExists) {
+      lastCommitTimeSynced = hoodieDLAClient.getLastCommitTimeSynced(tableName);
+    }*/
+    LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));

Review comment:
       since dla meta do not support alter table properties, it would be simpler here

##########
File path: hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.dla.util.Utils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Tool to sync a hoodie table with a dla table. Either use it as a api
+ * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the
+ * partitions incrementally (all the partitions modified since the last commit)
+ */
+@SuppressWarnings("WeakerAccess")
+public class DLASyncTool extends AbstractSyncTool {
+
+  private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
+  public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
+  public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
+
+  private final DLASyncConfig cfg;
+  private final HoodieDLAClient hoodieDLAClient;
+  private final String snapshotTableName;
+  private final Option<String> roTableTableName;
+
+  public DLASyncTool(Properties properties, FileSystem fs) {
+    super(properties, fs);
+    this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
+    this.cfg = Utils.propertiesToConfig(properties);
+    switch (hoodieDLAClient.getTableType()) {
+      case COPY_ON_WRITE:
+        this.snapshotTableName = cfg.tableName;
+        this.roTableTableName = Option.empty();
+        break;
+      case MERGE_ON_READ:
+        this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
+        this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
+            Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+        break;
+      default:
+        LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+        throw new InvalidTableException(hoodieDLAClient.getBasePath());
+    }
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    try {
+      switch (hoodieDLAClient.getTableType()) {
+        case COPY_ON_WRITE:
+          syncHoodieTable(snapshotTableName, false);
+          break;
+        case MERGE_ON_READ:
+          // sync a RO table for MOR
+          syncHoodieTable(roTableTableName.get(), false);
+          // sync a RT table for MOR
+          syncHoodieTable(snapshotTableName, true);
+          break;
+        default:
+          LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+          throw new InvalidTableException(hoodieDLAClient.getBasePath());
+      }
+    } catch (RuntimeException re) {
+      LOG.error("Got runtime exception when dla syncing", re);
+    } finally {
+      hoodieDLAClient.close();
+    }
+  }
+
+  private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
+    LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
+        + " of type " + hoodieDLAClient.getTableType());
+    // Check if the necessary table exists
+    boolean tableExists = hoodieDLAClient.doesTableExist(tableName);
+    // Get the parquet schema for this table looking at the latest commit
+    MessageType schema = hoodieDLAClient.getDataSchema();
+    // Sync schema if needed
+    syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
+
+    LOG.info("Schema sync complete. Syncing partitions for " + tableName);
+    // Get the last time we successfully synced partitions
+    Option<String> lastCommitTimeSynced = Option.empty();
+    /*if (tableExists) {
+      lastCommitTimeSynced = hoodieDLAClient.getLastCommitTimeSynced(tableName);
+    }*/
+    LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));

Review comment:
       since dla meta do not support alter table properties yet, it would be simpler here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465417053



##########
File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) {
+    this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+    this.tableType = metaClient.getTableType();
+    this.basePath = basePath;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+    this.fs = fs;
+    this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+                                   String inputFormatClass, String outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option<String> getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+  public abstract Map<String, String> getTableSchema(String tableName);
+
+  public HoodieTimeline getActiveTimeline() {
+    return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+    return tableType;
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+    try {
+      if (stmt != null) {
+        stmt.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the statement opened ", e);

Review comment:
       please change to warn

##########
File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) {
+    this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+    this.tableType = metaClient.getTableType();
+    this.basePath = basePath;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+    this.fs = fs;
+    this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+                                   String inputFormatClass, String outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option<String> getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+  public abstract Map<String, String> getTableSchema(String tableName);
+
+  public HoodieTimeline getActiveTimeline() {
+    return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+    return tableType;
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+    try {
+      if (stmt != null) {
+        stmt.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the statement opened ", e);
+    }
+
+    try {
+      if (resultSet != null) {
+        resultSet.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the resultset opened ", e);

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465228726



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,44 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters(SYNC_CLIENT_TOOL_CLASS)
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = String.format("%s,%s", syncClientToolClass, "org.apache.hudi.hive.HiveSyncTool")

Review comment:
       if someone does `hiveSyncEnabled == true && metaSyncEnabled == true && syncClientToolClass = org.apache.hudi.hive.HiveSyncTool`, we will sync two times? can we just a set to hold the classes.

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,44 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)

Review comment:
       actually like META_SYNC better here. it was more meaningful. wdyt?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -267,9 +267,16 @@ public Operation convert(String value) throws ParameterException {
         description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
     public Boolean filterDupes = false;
 
+    //will abandon in the future version, recommended use --enable-sync
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--enable-sync"}, description = "Enable syncing meta")
+    public Boolean enableMetaSync = false;
+
+    @Parameter(names = {"--sync-tool-classes"}, description = "Meta sync client tool, using comma to separate multi tools")
+    public String syncClientToolClass = "org.apache.hudi.hive.HiveSyncTool";

Review comment:
       lets do `HiveSyncTool.class.getName` or soemthing?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r452160698



##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
##########
@@ -272,6 +268,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat
   /**
    * Get the table schema.
    */
+  //???? overwrite

Review comment:
       agree with you. can put the abstract method getTableSchema to base syncClient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-668392214


   @lw309637554 I rebased this off master and also did some of the smaller stuff myself. if we can make a call on the multiple targets and the metrics questions, we can can resolve and land 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465448424



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,44 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters(SYNC_CLIENT_TOOL_CLASS)
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = String.format("%s,%s", syncClientToolClass, "org.apache.hudi.hive.HiveSyncTool")

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465449225



##########
File path: hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hive.HiveSyncConfig;
+import org.apache.hudi.hive.HoodieHiveSyncException;
+import org.apache.hudi.hive.PartitionValueExtractor;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieDLAClient extends AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(HoodieDLAClient.class);
+  private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync";
+  // Make sure we have the dla JDBC driver in classpath
+  private static final String DRIVER_NAME = "com.mysql.jdbc.Driver";
+  private static final String DLA_ESCAPE_CHARACTER = "";
+  private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES";
+
+  static {
+    try {
+      Class.forName(DRIVER_NAME);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e);
+    }
+  }
+
+  private Connection connection;
+  private DLASyncConfig dlaConfig;
+  private PartitionValueExtractor partitionValueExtractor;
+
+  public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) {
+    super(syncConfig.basePath, syncConfig.assumeDatePartitioning, fs);
+    this.dlaConfig = syncConfig;
+    try {
+      this.partitionValueExtractor =
+          (PartitionValueExtractor) Class.forName(dlaConfig.partitionValueExtractorClass).newInstance();
+    } catch (Exception e) {
+      throw new HoodieException(
+          "Failed to initialize PartitionValueExtractor class " + dlaConfig.partitionValueExtractorClass, e);
+    }
+    createDLAConnection();
+  }
+
+  private void createDLAConnection() {
+    if (connection == null) {
+      try {
+        Class.forName(DRIVER_NAME);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Unable to load DLA driver class", e);
+        return;
+      }
+      try {
+        this.connection = DriverManager.getConnection(dlaConfig.jdbcUrl, dlaConfig.dlaUser, dlaConfig.dlaPass);
+        LOG.info("Successfully established DLA connection to  " + dlaConfig.jdbcUrl);
+      } catch (SQLException e) {
+        throw new HoodieException("Cannot create dla connection ", e);
+      }
+    }
+  }
+
+  @Override
+  public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) {
+    try {
+      String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(), inputFormatClass, outputFormatClass, serdeClass);
+      LOG.info("Creating table with " + createSQLQuery);
+      updateDLASQL(createSQLQuery);
+    } catch (IOException e) {
+      throw new HoodieException("Failed to create table " + tableName, e);
+    }
+  }
+
+  public Map<String, String> getTableSchema(String tableName) {
+    if (!doesTableExist(tableName)) {
+      throw new IllegalArgumentException(
+          "Failed to get schema for table " + tableName + " does not exist");
+    }
+    Map<String, String> schema = new HashMap<>();
+    ResultSet result = null;
+    try {
+      DatabaseMetaData databaseMetaData = connection.getMetaData();
+      result = databaseMetaData.getColumns(dlaConfig.databaseName, dlaConfig.databaseName, tableName, null);
+      while (result.next()) {
+        String columnName = result.getString(4);
+        String columnType = result.getString(6);
+        if ("DECIMAL".equals(columnType)) {
+          int columnSize = result.getInt("COLUMN_SIZE");
+          int decimalDigits = result.getInt("DECIMAL_DIGITS");
+          columnType += String.format("(%s,%s)", columnSize, decimalDigits);
+        }
+        schema.put(columnName, columnType);
+      }
+      return schema;
+    } catch (SQLException e) {
+      throw new HoodieException("Failed to get table schema for " + tableName, e);
+    } finally {
+      closeQuietly(result, null);
+    }
+  }
+
+  @Override
+  public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) {
+    if (partitionsToAdd.isEmpty()) {
+      LOG.info("No partitions to add for " + tableName);
+      return;
+    }
+    LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName);
+    String sql = constructAddPartitions(tableName, partitionsToAdd);
+    updateDLASQL(sql);
+  }
+
+  public String constructAddPartitions(String tableName, List<String> partitions) {
+    return constructDLAAddPartitions(tableName, partitions);
+  }
+
+  String generateAbsolutePathStr(Path path) {
+    String absolutePathStr = path.toString();
+    if (path.toUri().getScheme() == null) {
+      absolutePathStr = getDefaultFs() + absolutePathStr;
+    }
+    return absolutePathStr.endsWith("/") ? absolutePathStr : absolutePathStr + "/";
+  }
+
+  public List<String> constructChangePartitions(String tableName, List<String> partitions) {
+    List<String> changePartitions = new ArrayList<>();
+    String useDatabase = "USE " + DLA_ESCAPE_CHARACTER + dlaConfig.databaseName + DLA_ESCAPE_CHARACTER;
+    changePartitions.add(useDatabase);
+    String alterTable = "ALTER TABLE " + DLA_ESCAPE_CHARACTER + tableName + DLA_ESCAPE_CHARACTER;
+    for (String partition : partitions) {
+      String partitionClause = getPartitionClause(partition);
+      Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
+      String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
+      String changePartition =
+          alterTable + " ADD IF NOT EXISTS PARTITION (" + partitionClause + ") LOCATION '" + fullPartitionPathStr + "'";
+      changePartitions.add(changePartition);
+    }
+    return changePartitions;
+  }
+
+  /**
+   * Generate Hive Partition from partition values.
+   *
+   * @param partition Partition path
+   * @return
+   */
+  public String getPartitionClause(String partition) {
+    List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition);
+    ValidationUtils.checkArgument(dlaConfig.partitionFields.size() == partitionValues.size(),
+        "Partition key parts " + dlaConfig.partitionFields + " does not match with partition values " + partitionValues
+            + ". Check partition strategy. ");
+    List<String> partBuilder = new ArrayList<>();
+    for (int i = 0; i < dlaConfig.partitionFields.size(); i++) {
+      partBuilder.add(dlaConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'");
+    }
+    return partBuilder.stream().collect(Collectors.joining(","));
+  }
+
+  private String constructDLAAddPartitions(String tableName, List<String> partitions) {
+    StringBuilder alterSQL = new StringBuilder("ALTER TABLE ");
+    alterSQL.append(DLA_ESCAPE_CHARACTER).append(dlaConfig.databaseName)
+        .append(DLA_ESCAPE_CHARACTER).append(".").append(DLA_ESCAPE_CHARACTER)
+        .append(tableName).append(DLA_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS ");
+    for (String partition : partitions) {
+      String partitionClause = getPartitionClause(partition);
+      Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition);
+      String fullPartitionPathStr = generateAbsolutePathStr(partitionPath);
+      alterSQL.append("  PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPathStr)
+          .append("' ");
+    }
+    return alterSQL.toString();
+  }
+
+  private void updateDLASQL(String sql) {
+    Statement stmt = null;
+    try {
+      stmt = connection.createStatement();
+      LOG.info("Executing SQL " + sql);
+      stmt.execute(sql);
+    } catch (SQLException e) {
+      throw new HoodieException("Failed in executing SQL " + sql, e);
+    } finally {
+      closeQuietly(null, stmt);
+    }
+  }
+
+  @Override
+  public boolean doesTableExist(String tableName) {
+    String sql = consutructShowCreateTableSQL(tableName);
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = connection.createStatement();
+      rs = stmt.executeQuery(sql);
+    } catch (SQLException e) {
+      return false;
+    } finally {
+      closeQuietly(rs, stmt);
+    }
+    return true;
+  }
+
+  @Override
+  public Option<String> getLastCommitTimeSynced(String tableName) {
+    String sql = consutructShowCreateTableSQL(tableName);
+    Statement stmt = null;
+    ResultSet rs = null;
+    try {
+      stmt = connection.createStatement();
+      rs = stmt.executeQuery(sql);
+      if (rs.next()) {
+        String table = rs.getString(2);
+        Map<String, String> attr = new HashMap<>();
+        int index = table.indexOf(TBL_PROPERTIES_STR);
+        if (index != -1) {
+          String sub = table.substring(index + TBL_PROPERTIES_STR.length());
+          sub = sub.replaceAll("\\(", "").replaceAll("\\)", "").replaceAll("'", "");
+          String[] str = sub.split(",");
+
+          for (int i = 0; i < str.length; i++) {
+            String key = str[i].split("=")[0].trim();
+            String value = str[i].split("=")[1].trim();
+            attr.put(key, value);
+          }
+        }
+        return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null));
+      }
+      return Option.empty();
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table", e);
+    } finally {
+      closeQuietly(rs, stmt);
+    }
+  }
+
+  @Override
+  public void updateLastCommitTimeSynced(String tableName) {
+    // dla do not support update tblproperties, so do nothing.

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465133092



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -267,9 +267,16 @@ public Operation convert(String value) throws ParameterException {
         description = "Should duplicate records from source be dropped/filtered out before insert/bulk-insert")
     public Boolean filterDupes = false;
 
+    //will abandon in the future version, recommended use --enable-sync

Review comment:
       agree with you ,and  i will do it

##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -255,6 +262,43 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters.get(SYNC_CLIENT_TOOL_CLASS).get
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = DEFAULT_SYNC_CLIENT_TOOL_CLASS

Review comment:
       yes, when user set hiveSyncEnabled and --sync-tool-classes, sync both hive and --sync-tool-classes make sense. i will fix it 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##########
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
     return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs, long syncNs, boolean hiveSync) {
     if (config.isMetricsOn()) {
       Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs));
-      Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+      if (hiveSync) {
+        Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(syncNs));
+      } else {
+        Metrics.registerGauge(getMetricsName("deltastreamer", "metaSyncDuration"), getDurationInMs(syncNs));

Review comment:
       i have do it , different  sync tool class have its own metrics with name of sync class

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -442,7 +449,8 @@ private void refreshTimeline() throws IOException {
     long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0;
 
     // Send DeltaStreamer Metrics
-    metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
+    metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs, true);
+    metrics.updateDeltaStreamerMetrics(overallTimeMs, metaSyncTimeMs, false);

Review comment:
       ok  , have do this in syncMeta




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#issuecomment-669238039


   @lw309637554  this seems ready?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] lw309637554 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
lw309637554 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r455011023



##########
File path: hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.dla;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.dla.util.Utils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.InvalidTableException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+import org.apache.hudi.hive.SchemaDifference;
+import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient;
+import org.apache.hudi.sync.common.AbstractSyncTool;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.schema.MessageType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+/**
+ * Tool to sync a hoodie table with a dla table. Either use it as a api
+ * DLASyncTool.syncHoodieTable(DLASyncConfig) or as a command line java -cp hoodie-hive.jar DLASyncTool [args]
+ * <p>
+ * This utility will get the schema from the latest commit and will sync dla table schema Also this will sync the
+ * partitions incrementally (all the partitions modified since the last commit)
+ */
+@SuppressWarnings("WeakerAccess")
+public class DLASyncTool extends AbstractSyncTool {
+
+  private static final Logger LOG = LogManager.getLogger(DLASyncTool.class);
+  public static final String SUFFIX_SNAPSHOT_TABLE = "_rt";
+  public static final String SUFFIX_READ_OPTIMIZED_TABLE = "_ro";
+
+  private final DLASyncConfig cfg;
+  private final HoodieDLAClient hoodieDLAClient;
+  private final String snapshotTableName;
+  private final Option<String> roTableTableName;
+
+  public DLASyncTool(Properties properties, FileSystem fs) {
+    super(properties, fs);
+    this.hoodieDLAClient = new HoodieDLAClient(Utils.propertiesToConfig(properties), fs);
+    this.cfg = Utils.propertiesToConfig(properties);
+    switch (hoodieDLAClient.getTableType()) {
+      case COPY_ON_WRITE:
+        this.snapshotTableName = cfg.tableName;
+        this.roTableTableName = Option.empty();
+        break;
+      case MERGE_ON_READ:
+        this.snapshotTableName = cfg.tableName + SUFFIX_SNAPSHOT_TABLE;
+        this.roTableTableName = cfg.skipROSuffix ? Option.of(cfg.tableName) :
+            Option.of(cfg.tableName + SUFFIX_READ_OPTIMIZED_TABLE);
+        break;
+      default:
+        LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+        throw new InvalidTableException(hoodieDLAClient.getBasePath());
+    }
+  }
+
+  @Override
+  public void syncHoodieTable() {
+    try {
+      switch (hoodieDLAClient.getTableType()) {
+        case COPY_ON_WRITE:
+          syncHoodieTable(snapshotTableName, false);
+          break;
+        case MERGE_ON_READ:
+          // sync a RO table for MOR
+          syncHoodieTable(roTableTableName.get(), false);
+          // sync a RT table for MOR
+          syncHoodieTable(snapshotTableName, true);
+          break;
+        default:
+          LOG.error("Unknown table type " + hoodieDLAClient.getTableType());
+          throw new InvalidTableException(hoodieDLAClient.getBasePath());
+      }
+    } catch (RuntimeException re) {
+      LOG.error("Got runtime exception when dla syncing", re);
+    } finally {
+      hoodieDLAClient.close();
+    }
+  }
+
+  private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) {
+    LOG.info("Trying to sync hoodie table " + tableName + " with base path " + hoodieDLAClient.getBasePath()
+        + " of type " + hoodieDLAClient.getTableType());
+    // Check if the necessary table exists
+    boolean tableExists = hoodieDLAClient.doesTableExist(tableName);
+    // Get the parquet schema for this table looking at the latest commit
+    MessageType schema = hoodieDLAClient.getDataSchema();
+    // Sync schema if needed
+    syncSchema(tableName, tableExists, useRealtimeInputFormat, schema);
+
+    LOG.info("Schema sync complete. Syncing partitions for " + tableName);
+    // Get the last time we successfully synced partitions
+    Option<String> lastCommitTimeSynced = Option.empty();
+    /*if (tableExists) {
+      lastCommitTimeSynced = hoodieDLAClient.getLastCommitTimeSynced(tableName);
+    }*/
+    LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));

Review comment:
       yes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] garyli1019 commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
garyli1019 commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r451945500



##########
File path: hudi-sync/hudi-hive-sync/pom.xml
##########
@@ -43,6 +45,11 @@
       <artifactId>hudi-hadoop-mr</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hudi</groupId>
+      <artifactId>hudi-sync-common</artifactId>

Review comment:
       I am thinking if we can put the `hudi-sync-common` into `hudi-common`. Then we have separate modules for each query engines like `hudi-sync-hive` `hudi-sync-dla` e.t.c
   Thoughts?

##########
File path: hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sync.common;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class AbstractSyncHoodieClient {
+  private static final Logger LOG = LogManager.getLogger(AbstractSyncHoodieClient.class);
+  protected final HoodieTableMetaClient metaClient;
+  protected HoodieTimeline activeTimeline;
+  protected final HoodieTableType tableType;
+  protected final FileSystem fs;
+  private String basePath;
+  private boolean assumeDatePartitioning;
+
+  public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, FileSystem fs) {
+    this.metaClient = new HoodieTableMetaClient(fs.getConf(), basePath, true);
+    this.tableType = metaClient.getTableType();
+    this.basePath = basePath;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+    this.fs = fs;
+    this.activeTimeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+  }
+
+  public abstract void createTable(String tableName, MessageType storageSchema,
+                                   String inputFormatClass, String outputFormatClass, String serdeClass);
+
+  public abstract boolean doesTableExist(String tableName);
+
+  public abstract Option<String> getLastCommitTimeSynced(String tableName);
+
+  public abstract void updateLastCommitTimeSynced(String tableName);
+
+  public abstract void addPartitionsToTable(String tableName, List<String> partitionsToAdd);
+
+  public abstract void updatePartitionsToTable(String tableName, List<String> changedPartitions);
+
+  public HoodieTimeline getActiveTimeline() {
+    return activeTimeline;
+  }
+
+  public HoodieTableType getTableType() {
+    return tableType;
+  }
+
+  public String getBasePath() {
+    return metaClient.getBasePath();
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  public void closeQuietly(ResultSet resultSet, Statement stmt) {
+    try {
+      if (stmt != null) {
+        stmt.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the statement opened ", e);
+    }
+
+    try {
+      if (resultSet != null) {
+        resultSet.close();
+      }
+    } catch (SQLException e) {
+      LOG.error("Could not close the resultset opened ", e);
+    }
+  }
+
+  /**
+   * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if
+   * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has
+   * not changed within a single atomic write.
+   *
+   * @return Parquet schema for this table
+   */
+  public MessageType getDataSchema() {
+    try {
+      return new TableSchemaResolver(metaClient).getTableParquetSchema();
+    } catch (Exception e) {
+      throw new HoodieSyncException("Failed to read data schema", e);
+    }
+  }
+
+  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
+  public List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
+    if (!lastCommitTimeSynced.isPresent()) {
+      LOG.info("Last commit time synced is not known, listing all partitions in " + basePath + ",FS :" + fs);
+      try {
+        return FSUtils.getAllPartitionPaths(fs, basePath, assumeDatePartitioning);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to list all partitions in " + basePath, e);
+      }
+    } else {
+      LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
+
+      HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
+      return timelineToSync.getInstants().map(s -> {
+        try {
+          return HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(s).get(), HoodieCommitMetadata.class);
+        } catch (IOException e) {
+          throw new HoodieIOException("Failed to get partitions written since " + lastCommitTimeSynced, e);
+        }
+      }).flatMap(s -> s.getPartitionToWriteStats().keySet().stream()).distinct().collect(Collectors.toList());
+    }
+  }
+
+  private MessageType readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws IOException {

Review comment:
       Can we use `TableSchemaResolver` to handle all hudi schema related methods?
   https://github.com/apache/hudi/blob/2603cfb33e272632d7f36a53e1b13fe86dbb8627/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java#L338

##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
##########
@@ -272,6 +268,7 @@ void createTable(String tableName, MessageType storageSchema, String inputFormat
   /**
    * Get the table schema.
    */
+  //???? overwrite

Review comment:
       Vote for putting this into the base class because we need to ensure the schema backward compatible for all query engines.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -63,10 +67,11 @@
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Objects;
+import java.util.Properties;

Review comment:
       nit: order

##########
File path: hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/SchemaDifference.java
##########
@@ -20,12 +20,14 @@
 
 import org.apache.parquet.schema.MessageType;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.StringJoiner;
+import java.util.ArrayList;

Review comment:
       nit: alphabetical order

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -237,6 +237,9 @@ public Operation convert(String value) throws ParameterException {
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--hoodie-sync-client-tool-class"}, description = "Meta sync client tool, using comma to separate multi tools")

Review comment:
       Should we change `--enable-hive-sync` to `--enable-sync` above?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] leesf commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
leesf commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465418437



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -261,6 +268,44 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
+  private def metaSync(parameters: Map[String, String],
+                       basePath: Path,
+                       hadoopConf: Configuration): Boolean = {
+    val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var metaSyncEnabled = parameters.get(HUDI_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+    var syncClientToolClass = parameters(SYNC_CLIENT_TOOL_CLASS)
+    // for backward compatibility
+    if (hiveSyncEnabled) {
+      metaSyncEnabled = true
+      syncClientToolClass = String.format("%s,%s", syncClientToolClass, "org.apache.hudi.hive.HiveSyncTool")

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar merged pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
vinothchandar merged pull request #1810:
URL: https://github.com/apache/hudi/pull/1810


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1810: [HUDI-875] Abstract hudi-sync-common, and support hudi-hive-sync

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1810:
URL: https://github.com/apache/hudi/pull/1810#discussion_r465468482



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
##########
@@ -67,10 +77,15 @@ String getMetricsName(String action, String metric) {
     return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
   }
 
-  public void updateDeltaStreamerMetrics(long durationInNs, long hiveSyncNs) {
+  public void updateDeltaStreamerMetrics(long durationInNs) {
     if (config.isMetricsOn()) {
       Metrics.registerGauge(getMetricsName("deltastreamer", "duration"), getDurationInMs(durationInNs));
-      Metrics.registerGauge(getMetricsName("deltastreamer", "hiveSyncDuration"), getDurationInMs(hiveSyncNs));
+    }
+  }
+
+  public void updateDeltaStreamerMetaSyncMetrics(String syncClassName, long syncNs) {

Review comment:
       do we need the entire class name here? Would that not make for a long metric name? :) 
   
   May be have a `getShortName()` method for the AbstractSyncTool class and return "hive" and "dla" from them? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org