You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/10/04 23:28:19 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #1566: [HUDI-603]: DeltaStreamer can now fetch schema before every run in continuous mode

vinothchandar commented on a change in pull request #1566:
URL: https://github.com/apache/hudi/pull/1566#discussion_r499299850



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -244,6 +263,18 @@ private void refreshTimeline() throws IOException {
         this.schemaProvider = srcRecordsWithCkpt.getKey();

Review comment:
       I really dislike all the nested ifs-and null checks. :( there ought to be a better way of structuring this code overall. Side rant 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -534,19 +565,33 @@ public void syncHive(HiveConf conf) {
     syncHive();
   }
 
-  /**
-   * Note that depending on configs and source-type, schemaProvider could either be eagerly or lazily created.
-   * SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of
-   * this constraint.
-   */
-  private void setupWriteClient() {
-    LOG.info("Setting up Hoodie Write Client");
-    if ((null != schemaProvider) && (null == writeClient)) {
-      registerAvroSchemas(schemaProvider);
-      HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
-      writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), hoodieCfg, true);
-      onInitializingHoodieWriteClient.apply(writeClient);
+  public void setupWriteClient() throws IOException {
+    if ((null != schemaProvider)) {
+      Schema sourceSchema = schemaProvider.getSourceSchema();
+      Schema targetSchema = schemaProvider.getTargetSchema();
+      createNewWriteClient(sourceSchema, targetSchema);
+    }
+  }
+
+  private void createNewWriteClient(Schema sourceSchema, Schema targetSchema) throws IOException {

Review comment:
       rename: reInitWriteClient() 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -624,21 +624,26 @@ private void shutdownCompactor(boolean error) {
      */
     protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) {
       if (cfg.isAsyncCompactionEnabled()) {
-        asyncCompactService = new SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient);
-        // Enqueue existing pending compactions first
-        HoodieTableMetaClient meta =
-            new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true);
-        List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta);
-        pending.forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant));
-        asyncCompactService.start((error) -> {
-          // Shutdown DeltaSync
-          shutdown(false);
-          return true;
-        });
-        try {
-          asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
-        } catch (InterruptedException ie) {
-          throw new HoodieException(ie);
+        if (null != asyncCompactService) {
+          // Update the write client used by Async Compactor.
+          asyncCompactService.updateWriteClient(writeClient);

Review comment:
       any need to make this `synchronized` across threads? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org