You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "jonvex (via GitHub)" <gi...@apache.org> on 2023/02/24 19:17:34 UTC

[GitHub] [hudi] jonvex opened a new pull request, #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

jonvex opened a new pull request, #8043:
URL: https://github.com/apache/hudi/pull/8043

   ### Change Logs
   
   Add new config "hoodie.deltastreamer.multiwriter.source.checkpoint.id". When this is set, multiwriter checkpoints are enabled for deltastreamer. Each deltastreamer instance should use a unique id. 
   
   ### Impact
   
   Can write from multiple sources to one table with deltastreamer
   
   ### Risk level (write none, low medium or high below)
   
   low
   
   ### Documentation Update
   
   Need to add to change logs, maybe need a section in the deltastreamer page
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1445430165

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407",
       "triggerID" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dbbd03b207d3108988f6c2997f6a3504f39f265d Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391) 
   * 0e24ef2e8a02aba64ec436e41ca9fce1f268977f Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1445459123

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407",
       "triggerID" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0e24ef2e8a02aba64ec436e41ca9fce1f268977f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1447278384

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407",
       "triggerID" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15445",
       "triggerID" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 31d01b36c88467a747e555f7c35c2e4faf4c04d8 UNKNOWN
   * b820afad66b46286f1465f4a1f5ad699db530326 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15445) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1446638836

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407",
       "triggerID" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15445",
       "triggerID" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0e24ef2e8a02aba64ec436e41ca9fce1f268977f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407) 
   * 31d01b36c88467a747e555f7c35c2e4faf4c04d8 UNKNOWN
   * b820afad66b46286f1465f4a1f5ad699db530326 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15445) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #8043:
URL: https://github.com/apache/hudi/pull/8043#discussion_r1117688720


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -698,8 +717,13 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
     boolean hasErrors = totalErrorRecords > 0;
     if (!hasErrors || cfg.commitOnErrors) {
       HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+      Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc = Option.empty();
       if (checkpointStr != null) {
-        checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
+        if (identifier.isPresent()) {
+          extraPreCommitFunc = Option.of(new HoodieDeltaStreamerMultiwriterCheckpoint(this, checkpointStr, latestCheckpointWritten));

Review Comment:
   fetching of latestCheckpoint should happen within the extraPreCommitFunc i.e. within the lock. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMultiwriterCheckpoint.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+
+/**
+ * This is used as an extraPreCommitFunc in BaseHoodieWriteClient
+ * It adds the checkpoint to deltacommit metadata. It must be implemented this way
+ * because it needs the lock to ensure that it does not overwrite another deltastreamers
+ * latest checkpoint with an older one.
+ */
+public class HoodieDeltaStreamerMultiwriterCheckpoint implements BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata> {

Review Comment:
   nit: can rename to "DeltastreamerMultiWriterCkptUpdateFunc"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMultiwriterCheckpoint.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+
+/**
+ * This is used as an extraPreCommitFunc in BaseHoodieWriteClient
+ * It adds the checkpoint to deltacommit metadata. It must be implemented this way
+ * because it needs the lock to ensure that it does not overwrite another deltastreamers
+ * latest checkpoint with an older one.
+ */
+public class HoodieDeltaStreamerMultiwriterCheckpoint implements BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata> {
+  private static final ObjectMapper OM = new ObjectMapper();
+
+  //deltastreamer id
+  private final String id;
+
+  //the deltastreamer
+  private final DeltaSync ds;
+  private final String checkpoint;
+  private final String latestCheckpointWritten;
+  
+  public HoodieDeltaStreamerMultiwriterCheckpoint(DeltaSync ds, String checkpoint, String latestCheckpointWritten) {

Review Comment:
   checkpoint -> checkpointStr 
   infact we can name it as "newCheckpoint"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMultiwriterCheckpoint.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+
+/**
+ * This is used as an extraPreCommitFunc in BaseHoodieWriteClient
+ * It adds the checkpoint to deltacommit metadata. It must be implemented this way
+ * because it needs the lock to ensure that it does not overwrite another deltastreamers
+ * latest checkpoint with an older one.
+ */
+public class HoodieDeltaStreamerMultiwriterCheckpoint implements BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata> {
+  private static final ObjectMapper OM = new ObjectMapper();
+
+  //deltastreamer id
+  private final String id;
+
+  //the deltastreamer
+  private final DeltaSync ds;
+  private final String checkpoint;
+  private final String latestCheckpointWritten;
+  
+  public HoodieDeltaStreamerMultiwriterCheckpoint(DeltaSync ds, String checkpoint, String latestCheckpointWritten) {
+    this.ds = ds;
+    this.id = ds.getId();
+    this.checkpoint = checkpoint;
+    this.latestCheckpointWritten = latestCheckpointWritten;
+  }
+
+  @Override
+  public void accept(HoodieTableMetaClient metaClient, HoodieCommitMetadata commitMetadata) {
+    //Get last completed deltacommit
+    Option<HoodieCommitMetadata> latestCommitMetadata;
+    try {
+      ds.refreshTimeline();

Review Comment:
   we don't need to refresh here(and its not used anywhere)
   in next line, we can do 
   ```
   metaClient.reloadActiveTimeline().getCommitsTimeline()
   ```



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMultiwriterCheckpoint.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+
+/**
+ * This is used as an extraPreCommitFunc in BaseHoodieWriteClient
+ * It adds the checkpoint to deltacommit metadata. It must be implemented this way
+ * because it needs the lock to ensure that it does not overwrite another deltastreamers
+ * latest checkpoint with an older one.
+ */
+public class HoodieDeltaStreamerMultiwriterCheckpoint implements BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata> {
+  private static final ObjectMapper OM = new ObjectMapper();
+
+  //deltastreamer id
+  private final String id;
+
+  //the deltastreamer
+  private final DeltaSync ds;
+  private final String checkpoint;
+  private final String latestCheckpointWritten;
+  
+  public HoodieDeltaStreamerMultiwriterCheckpoint(DeltaSync ds, String checkpoint, String latestCheckpointWritten) {
+    this.ds = ds;
+    this.id = ds.getId();
+    this.checkpoint = checkpoint;
+    this.latestCheckpointWritten = latestCheckpointWritten;
+  }
+
+  @Override
+  public void accept(HoodieTableMetaClient metaClient, HoodieCommitMetadata commitMetadata) {
+    //Get last completed deltacommit
+    Option<HoodieCommitMetadata> latestCommitMetadata;
+    try {
+      ds.refreshTimeline();
+      latestCommitMetadata = ds.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline().getCommitsTimeline());
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to get the latest commit metadata", e);
+    }
+
+    //Get checkpoint map
+    Map<String,String> checkpointMap;
+    if (latestCommitMetadata.isPresent()) {
+      String value = commitMetadata.getMetadata(CHECKPOINT_KEY);
+      try {
+        checkpointMap = OM.readValue(value, Map.class);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to parse checkpoint as map", e);
+      }
+    } else {
+      checkpointMap = new HashMap<>();
+    }
+
+    if (!StringUtils.isNullOrEmpty(latestCheckpointWritten)
+        && checkpointMap.containsKey(id) && !checkpointMap.get(id).equals(latestCheckpointWritten)) {
+      throw new HoodieException(String.format("Multiple DeltaStreamer instances with id: %s detected. Each Deltastreamer must have a unique id.", id));
+    }
+
+    //Add map to metadata
+    checkpointMap.put(id, checkpoint);
+    try {
+      commitMetadata.addMetadata(CHECKPOINT_KEY, OM.writeValueAsString(checkpointMap));
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Option<String> readCheckpointValue(String value, String id) {

Review Comment:
   this is preCommitFunc. lets not place static method here. 
   may be we can create DeltastreamerCheckpointUtils and move this method and any other static method related to checkpoint parsing.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -240,6 +243,18 @@ public class DeltaSync implements Serializable, Closeable {
 
   private transient HoodieMetrics hoodieMetrics;
 
+
+  /**
+   * Unique identifier of the deltastreamer
+   * */
+  private transient Option<String> identifier;

Review Comment:
   may be we can rename this to `multiWriterIdentifier`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMultiwriterCheckpoint.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+
+/**
+ * This is used as an extraPreCommitFunc in BaseHoodieWriteClient
+ * It adds the checkpoint to deltacommit metadata. It must be implemented this way
+ * because it needs the lock to ensure that it does not overwrite another deltastreamers
+ * latest checkpoint with an older one.
+ */
+public class HoodieDeltaStreamerMultiwriterCheckpoint implements BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata> {
+  private static final ObjectMapper OM = new ObjectMapper();
+
+  //deltastreamer id
+  private final String id;
+
+  //the deltastreamer
+  private final DeltaSync ds;
+  private final String checkpoint;
+  private final String latestCheckpointWritten;
+  
+  public HoodieDeltaStreamerMultiwriterCheckpoint(DeltaSync ds, String checkpoint, String latestCheckpointWritten) {
+    this.ds = ds;
+    this.id = ds.getId();
+    this.checkpoint = checkpoint;
+    this.latestCheckpointWritten = latestCheckpointWritten;
+  }
+
+  @Override
+  public void accept(HoodieTableMetaClient metaClient, HoodieCommitMetadata commitMetadata) {
+    //Get last completed deltacommit
+    Option<HoodieCommitMetadata> latestCommitMetadata;
+    try {
+      ds.refreshTimeline();
+      latestCommitMetadata = ds.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline().getCommitsTimeline());
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to get the latest commit metadata", e);
+    }
+
+    //Get checkpoint map
+    Map<String,String> checkpointMap;
+    if (latestCommitMetadata.isPresent()) {
+      String value = commitMetadata.getMetadata(CHECKPOINT_KEY);
+      try {
+        checkpointMap = OM.readValue(value, Map.class);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to parse checkpoint as map", e);
+      }
+    } else {
+      checkpointMap = new HashMap<>();
+    }
+
+    if (!StringUtils.isNullOrEmpty(latestCheckpointWritten)
+        && checkpointMap.containsKey(id) && !checkpointMap.get(id).equals(latestCheckpointWritten)) {
+      throw new HoodieException(String.format("Multiple DeltaStreamer instances with id: %s detected. Each Deltastreamer must have a unique id.", id));

Review Comment:
   whats the last condition here? 
   if checkpoint does not get updated compared to previous value. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1445428285

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dbbd03b207d3108988f6c2997f6a3504f39f265d Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391) 
   * 0e24ef2e8a02aba64ec436e41ca9fce1f268977f UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1447367054

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407",
       "triggerID" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15445",
       "triggerID" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d97c66ce9bfda59d169f07dba3cfb50c50e1b470",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15452",
       "triggerID" : "d97c66ce9bfda59d169f07dba3cfb50c50e1b470",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 31d01b36c88467a747e555f7c35c2e4faf4c04d8 UNKNOWN
   * b820afad66b46286f1465f4a1f5ad699db530326 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15445) 
   * d97c66ce9bfda59d169f07dba3cfb50c50e1b470 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15452) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1444585397

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dbbd03b207d3108988f6c2997f6a3504f39f265d Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] jonvex commented on a diff in pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "jonvex (via GitHub)" <gi...@apache.org>.
jonvex commented on code in PR #8043:
URL: https://github.com/apache/hudi/pull/8043#discussion_r1118885381


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -698,8 +717,13 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
     boolean hasErrors = totalErrorRecords > 0;
     if (!hasErrors || cfg.commitOnErrors) {
       HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+      Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc = Option.empty();
       if (checkpointStr != null) {
-        checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
+        if (identifier.isPresent()) {
+          extraPreCommitFunc = Option.of(new HoodieDeltaStreamerMultiwriterCheckpoint(this, checkpointStr, latestCheckpointWritten));

Review Comment:
   That does happen inside. latestCheckpointWritten is used to detect if multiple deltastreamers are using the same id. We keep track of the last checkpoint that we wrote so that if it doesn't match the latest commit, then we know that another deltastreamer used the same id



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] jonvex commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "jonvex (via GitHub)" <gi...@apache.org>.
jonvex commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1447344082

   Addressed comments. Last commits failing ci were the 2 flakiest tests so should hopefully be good to land then


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan merged pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan merged PR #8043:
URL: https://github.com/apache/hudi/pull/8043


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #8043:
URL: https://github.com/apache/hudi/pull/8043#discussion_r1119363467


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -620,6 +620,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Whether to enable commit conflict checking or not during early "
           + "conflict detection.");
 
+  public static final ConfigProperty<String> MUTLI_WRITER_SOURCE_CHECKPOINT_ID = ConfigProperty
+      .key("hoodie.deltastreamer.multiwriter.source.checkpoint.id")
+      .noDefaultValue()
+      .withDocumentation("Define Unique Id for source to be used in commit checkpoint");

Review Comment:
   lets add documentation that for a single deltastreamer, users don't need to set this. and clarify what is meant by multiwriter source. its not general multi-writer that users generally relate to. i.e. one writer using deltastreamer and another writer using spark-datasource.
   this is specifically targetting use-cases where multiple wirters each using deltastreamer to write to same target table. 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -1061,4 +1105,8 @@ private Set<String> getPartitionColumns(KeyGenerator keyGenerator, TypedProperti
     String partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, props);
     return Arrays.stream(partitionColumns.split(",")).collect(Collectors.toSet());
   }
+
+  public String getId() {

Review Comment:
   minor. getMultiwriterIdentifier



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -149,6 +154,7 @@ public class DeltaSync implements Serializable, Closeable {
 
   private static final long serialVersionUID = 1L;
   private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
+  private static final ObjectMapper OM = new ObjectMapper();

Review Comment:
   can we expand OM to "OBJECT_MAPPER"



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1911,6 +1920,80 @@ public void testKafkaTimestampType() throws Exception {
     TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext);
   }
 
+  @Test
+  public void testDeltaStreamerMultiwriterCheckpoint() throws Exception {
+    // prep parquet source
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesMultiCheckpoint" + testNum;
+    int parquetRecords = 100;
+    HoodieTestDataGenerator dataGenerator = prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true,
+        HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+
+    prepareParquetDFSSource(true, true, "source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "driver");
+
+    // delta streamer w/ parquet source
+    String tableBasePath = basePath + "/test_multi_checkpoint" + testNum;
+    HoodieDeltaStreamer.Config parquetCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+        Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
+        true, Integer.MAX_VALUE, false, null, null, "timestamp", null);
+    parquetCfg.configs = new ArrayList<>();
+    parquetCfg.configs.add(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key() + "=parquet");
+    //parquetCfg.continuousMode = false;
+    HoodieDeltaStreamer parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc);
+    parquetDs.sync();
+    TestHelpers.assertRecordCount(100, tableBasePath, sqlContext);
+
+    // prep json kafka source
+    topicName = "topic" + testNum;
+    prepareJsonKafkaDFSFiles(20, true, topicName);
+    Map<String, String> kafkaExtraProps = new HashMap<>();
+    kafkaExtraProps.put(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key(), "kafka");
+    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName, kafkaExtraProps);
+    // delta streamer w/ json kafka source
+    HoodieDeltaStreamer kafkaDs = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
+            Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
+            true, Integer.MAX_VALUE, false, null, null, "timestamp", null), jsc);
+    kafkaDs.sync();
+    int totalExpectedRecords = parquetRecords + 20;
+    TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
+    //parquet again
+    prepareParquetDFSUpdates(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA,
+        dataGenerator, "001");
+
+    //parquetCfg.continuousMode = false;
+    parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc);
+    parquetDs.sync();
+    TestHelpers.assertRecordCount(parquetRecords * 2 + 20, tableBasePath, sqlContext);
+
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), tableBasePath);
+    List<HoodieInstant> instants = metaClient.getCommitsTimeline().getInstants();
+
+    ObjectMapper om = new ObjectMapper();
+    HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+        .fromBytes(metaClient.getCommitsTimeline().getInstantDetails(instants.get(0)).get(), HoodieCommitMetadata.class);
+    Map<String,String>  m = om.readValue(commitMetadata.getExtraMetadata().get(CHECKPOINT_KEY), Map.class);

Review Comment:
   lets use good naming even in tests. "m" for instance. 



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1911,6 +1920,80 @@ public void testKafkaTimestampType() throws Exception {
     TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext);
   }
 
+  @Test
+  public void testDeltaStreamerMultiwriterCheckpoint() throws Exception {
+    // prep parquet source
+    PARQUET_SOURCE_ROOT = basePath + "/parquetFilesMultiCheckpoint" + testNum;
+    int parquetRecords = 100;
+    HoodieTestDataGenerator dataGenerator = prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true,
+        HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+
+    prepareParquetDFSSource(true, true, "source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
+        PARQUET_SOURCE_ROOT, false, "driver");
+
+    // delta streamer w/ parquet source
+    String tableBasePath = basePath + "/test_multi_checkpoint" + testNum;
+    HoodieDeltaStreamer.Config parquetCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
+        Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
+        true, Integer.MAX_VALUE, false, null, null, "timestamp", null);
+    parquetCfg.configs = new ArrayList<>();
+    parquetCfg.configs.add(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key() + "=parquet");
+    //parquetCfg.continuousMode = false;
+    HoodieDeltaStreamer parquetDs = new HoodieDeltaStreamer(parquetCfg, jsc);
+    parquetDs.sync();
+    TestHelpers.assertRecordCount(100, tableBasePath, sqlContext);
+
+    // prep json kafka source
+    topicName = "topic" + testNum;
+    prepareJsonKafkaDFSFiles(20, true, topicName);
+    Map<String, String> kafkaExtraProps = new HashMap<>();
+    kafkaExtraProps.put(MUTLI_WRITER_SOURCE_CHECKPOINT_ID.key(), "kafka");
+    prepareJsonKafkaDFSSource(PROPS_FILENAME_TEST_JSON_KAFKA, "earliest", topicName, kafkaExtraProps);
+    // delta streamer w/ json kafka source
+    HoodieDeltaStreamer kafkaDs = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
+            Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
+            true, Integer.MAX_VALUE, false, null, null, "timestamp", null), jsc);
+    kafkaDs.sync();
+    int totalExpectedRecords = parquetRecords + 20;
+    TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
+    //parquet again
+    prepareParquetDFSUpdates(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA,
+        dataGenerator, "001");
+
+    //parquetCfg.continuousMode = false;

Review Comment:
   remove extraneous lines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1447351809

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407",
       "triggerID" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15445",
       "triggerID" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d97c66ce9bfda59d169f07dba3cfb50c50e1b470",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "d97c66ce9bfda59d169f07dba3cfb50c50e1b470",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 31d01b36c88467a747e555f7c35c2e4faf4c04d8 UNKNOWN
   * b820afad66b46286f1465f4a1f5ad699db530326 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15445) 
   * d97c66ce9bfda59d169f07dba3cfb50c50e1b470 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1447570557

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407",
       "triggerID" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15445",
       "triggerID" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "triggerType" : "PUSH"
     }, {
       "hash" : "d97c66ce9bfda59d169f07dba3cfb50c50e1b470",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15452",
       "triggerID" : "d97c66ce9bfda59d169f07dba3cfb50c50e1b470",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 31d01b36c88467a747e555f7c35c2e4faf4c04d8 UNKNOWN
   * d97c66ce9bfda59d169f07dba3cfb50c50e1b470 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15452) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1444389392

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dbbd03b207d3108988f6c2997f6a3504f39f265d Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1446564928

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407",
       "triggerID" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b820afad66b46286f1465f4a1f5ad699db530326",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0e24ef2e8a02aba64ec436e41ca9fce1f268977f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407) 
   * 31d01b36c88467a747e555f7c35c2e4faf4c04d8 UNKNOWN
   * b820afad66b46286f1465f4a1f5ad699db530326 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1446552120

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15391",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407",
       "triggerID" : "0e24ef2e8a02aba64ec436e41ca9fce1f268977f",
       "triggerType" : "PUSH"
     }, {
       "hash" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "31d01b36c88467a747e555f7c35c2e4faf4c04d8",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0e24ef2e8a02aba64ec436e41ca9fce1f268977f Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15407) 
   * 31d01b36c88467a747e555f7c35c2e4faf4c04d8 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] jonvex commented on a diff in pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "jonvex (via GitHub)" <gi...@apache.org>.
jonvex commented on code in PR #8043:
URL: https://github.com/apache/hudi/pull/8043#discussion_r1118900157


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMultiwriterCheckpoint.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+
+/**
+ * This is used as an extraPreCommitFunc in BaseHoodieWriteClient
+ * It adds the checkpoint to deltacommit metadata. It must be implemented this way
+ * because it needs the lock to ensure that it does not overwrite another deltastreamers
+ * latest checkpoint with an older one.
+ */
+public class HoodieDeltaStreamerMultiwriterCheckpoint implements BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata> {
+  private static final ObjectMapper OM = new ObjectMapper();
+
+  //deltastreamer id
+  private final String id;
+
+  //the deltastreamer
+  private final DeltaSync ds;
+  private final String checkpoint;
+  private final String latestCheckpointWritten;
+  
+  public HoodieDeltaStreamerMultiwriterCheckpoint(DeltaSync ds, String checkpoint, String latestCheckpointWritten) {
+    this.ds = ds;
+    this.id = ds.getId();
+    this.checkpoint = checkpoint;
+    this.latestCheckpointWritten = latestCheckpointWritten;
+  }
+
+  @Override
+  public void accept(HoodieTableMetaClient metaClient, HoodieCommitMetadata commitMetadata) {
+    //Get last completed deltacommit
+    Option<HoodieCommitMetadata> latestCommitMetadata;
+    try {
+      ds.refreshTimeline();
+      latestCommitMetadata = ds.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline().getCommitsTimeline());
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to get the latest commit metadata", e);
+    }
+
+    //Get checkpoint map
+    Map<String,String> checkpointMap;
+    if (latestCommitMetadata.isPresent()) {
+      String value = commitMetadata.getMetadata(CHECKPOINT_KEY);
+      try {
+        checkpointMap = OM.readValue(value, Map.class);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to parse checkpoint as map", e);
+      }
+    } else {
+      checkpointMap = new HashMap<>();
+    }
+
+    if (!StringUtils.isNullOrEmpty(latestCheckpointWritten)
+        && checkpointMap.containsKey(id) && !checkpointMap.get(id).equals(latestCheckpointWritten)) {
+      throw new HoodieException(String.format("Multiple DeltaStreamer instances with id: %s detected. Each Deltastreamer must have a unique id.", id));
+    }
+
+    //Add map to metadata
+    checkpointMap.put(id, checkpoint);
+    try {
+      commitMetadata.addMetadata(CHECKPOINT_KEY, OM.writeValueAsString(checkpointMap));
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Option<String> readCheckpointValue(String value, String id) {

Review Comment:
   I just thought it would make more sense to keep this logic here. I moved it back into deltasync. Keeping it here also reduces the number of objectmappers used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #8043:
URL: https://github.com/apache/hudi/pull/8043#issuecomment-1444316323

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dbbd03b207d3108988f6c2997f6a3504f39f265d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * dbbd03b207d3108988f6c2997f6a3504f39f265d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] jonvex commented on a diff in pull request #8043: [HUDI-5843] multiwriter deltastreamer checkpoints

Posted by "jonvex (via GitHub)" <gi...@apache.org>.
jonvex commented on code in PR #8043:
URL: https://github.com/apache/hudi/pull/8043#discussion_r1118895471


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMultiwriterCheckpoint.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
+
+/**
+ * This is used as an extraPreCommitFunc in BaseHoodieWriteClient
+ * It adds the checkpoint to deltacommit metadata. It must be implemented this way
+ * because it needs the lock to ensure that it does not overwrite another deltastreamers
+ * latest checkpoint with an older one.
+ */
+public class HoodieDeltaStreamerMultiwriterCheckpoint implements BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata> {
+  private static final ObjectMapper OM = new ObjectMapper();
+
+  //deltastreamer id
+  private final String id;
+
+  //the deltastreamer
+  private final DeltaSync ds;
+  private final String checkpoint;
+  private final String latestCheckpointWritten;
+  
+  public HoodieDeltaStreamerMultiwriterCheckpoint(DeltaSync ds, String checkpoint, String latestCheckpointWritten) {
+    this.ds = ds;
+    this.id = ds.getId();
+    this.checkpoint = checkpoint;
+    this.latestCheckpointWritten = latestCheckpointWritten;
+  }
+
+  @Override
+  public void accept(HoodieTableMetaClient metaClient, HoodieCommitMetadata commitMetadata) {
+    //Get last completed deltacommit
+    Option<HoodieCommitMetadata> latestCommitMetadata;
+    try {
+      ds.refreshTimeline();
+      latestCommitMetadata = ds.getLatestCommitMetadataWithValidCheckpointInfo(metaClient.getActiveTimeline().getCommitsTimeline());
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to get the latest commit metadata", e);
+    }
+
+    //Get checkpoint map
+    Map<String,String> checkpointMap;
+    if (latestCommitMetadata.isPresent()) {
+      String value = commitMetadata.getMetadata(CHECKPOINT_KEY);
+      try {
+        checkpointMap = OM.readValue(value, Map.class);
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to parse checkpoint as map", e);
+      }
+    } else {
+      checkpointMap = new HashMap<>();
+    }
+
+    if (!StringUtils.isNullOrEmpty(latestCheckpointWritten)
+        && checkpointMap.containsKey(id) && !checkpointMap.get(id).equals(latestCheckpointWritten)) {
+      throw new HoodieException(String.format("Multiple DeltaStreamer instances with id: %s detected. Each Deltastreamer must have a unique id.", id));

Review Comment:
   If the checkpoint in the latest commit does not match the last checkpoint this deltastreamer wrote, it means that another deltstreamer has the same id



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org