You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/10/06 23:48:57 UTC

[GitHub] [hudi] bvaradar commented on a change in pull request #1566: [HUDI-603]: DeltaStreamer can now fetch schema before every run in continuous mode

bvaradar commented on a change in pull request #1566:
URL: https://github.com/apache/hudi/pull/1566#discussion_r500657915



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -624,21 +624,26 @@ private void shutdownCompactor(boolean error) {
      */
     protected Boolean onInitializingWriteClient(SparkRDDWriteClient writeClient) {
       if (cfg.isAsyncCompactionEnabled()) {
-        asyncCompactService = new SparkAsyncCompactService(new HoodieSparkEngineContext(jssc), writeClient);
-        // Enqueue existing pending compactions first
-        HoodieTableMetaClient meta =
-            new HoodieTableMetaClient(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, true);
-        List<HoodieInstant> pending = CompactionUtils.getPendingCompactionInstantTimes(meta);
-        pending.forEach(hoodieInstant -> asyncCompactService.enqueuePendingCompaction(hoodieInstant));
-        asyncCompactService.start((error) -> {
-          // Shutdown DeltaSync
-          shutdown(false);
-          return true;
-        });
-        try {
-          asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
-        } catch (InterruptedException ie) {
-          throw new HoodieException(ie);
+        if (null != asyncCompactService) {
+          // Update the write client used by Async Compactor.
+          asyncCompactService.updateWriteClient(writeClient);

Review comment:
       Should work as is but adding synchronized if we make future changes in a way where this would be not thread-safe.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -534,19 +565,33 @@ public void syncHive(HiveConf conf) {
     syncHive();
   }
 
-  /**
-   * Note that depending on configs and source-type, schemaProvider could either be eagerly or lazily created.
-   * SchemaProvider creation is a precursor to HoodieWriteClient and AsyncCompactor creation. This method takes care of
-   * this constraint.
-   */
-  private void setupWriteClient() {
-    LOG.info("Setting up Hoodie Write Client");
-    if ((null != schemaProvider) && (null == writeClient)) {
-      registerAvroSchemas(schemaProvider);
-      HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
-      writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(jssc), hoodieCfg, true);
-      onInitializingHoodieWriteClient.apply(writeClient);
+  public void setupWriteClient() throws IOException {
+    if ((null != schemaProvider)) {
+      Schema sourceSchema = schemaProvider.getSourceSchema();
+      Schema targetSchema = schemaProvider.getTargetSchema();
+      createNewWriteClient(sourceSchema, targetSchema);
+    }
+  }
+
+  private void createNewWriteClient(Schema sourceSchema, Schema targetSchema) throws IOException {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org