You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/04/04 01:57:44 UTC

[incubator-hudi] branch master updated: HUDI-644 kafka connect checkpoint provider (#1453)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 575d87c  HUDI-644 kafka connect checkpoint provider (#1453)
575d87c is described below

commit 575d87cf7d6f0f743cb7cec6520d80e6fcc3e20a
Author: YanJia-Gary-Li <ya...@gmail.com>
AuthorDate: Fri Apr 3 18:57:34 2020 -0700

    HUDI-644 kafka connect checkpoint provider (#1453)
---
 .../checkpointing/InitialCheckPointProvider.java   |  31 +++++
 .../checkpointing/KafkaConnectHdfsProvider.java    | 152 +++++++++++++++++++++
 .../TestKafkaConnectHdfsProvider.java              |  94 +++++++++++++
 3 files changed, 277 insertions(+)

diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
new file mode 100644
index 0000000..741b05c
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/InitialCheckPointProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.checkpointing;
+
+import org.apache.hudi.exception.HoodieException;
+
+/**
+ * Provide the initial checkpoint for delta streamer.
+ */
+public interface InitialCheckPointProvider {
+  /**
+   * Get checkpoint string recognizable for delta streamer.
+   */
+  String getCheckpoint() throws HoodieException;
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java
new file mode 100644
index 0000000..f464f68
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/checkpointing/KafkaConnectHdfsProvider.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.checkpointing;
+
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Generate checkpoint from Kafka-Connect-HDFS managed data set.
+ * Documentation: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html
+ */
+public class KafkaConnectHdfsProvider implements InitialCheckPointProvider {
+  private final Path path;
+  private final FileSystem fs;
+
+  private static final String FILENAME_SEPARATOR = "[\\+\\.]";
+
+  public KafkaConnectHdfsProvider(final Path basePath, final FileSystem fileSystem) {
+    this.path = basePath;
+    this.fs = fileSystem;
+  }
+
+  /**
+   * PathFilter for Kafka-Connect-HDFS.
+   * Directory format: /partition1=xxx/partition2=xxx
+   * File format: topic+partition+lowerOffset+upperOffset.file
+   */
+  public static class KafkaConnectPathFilter implements PathFilter {
+    private static final Pattern DIRECTORY_PATTERN = Pattern.compile(".*=.*");
+    private static final Pattern PATTERN =
+        Pattern.compile("[a-zA-Z0-9\\._\\-]+\\+\\d+\\+\\d+\\+\\d+(.\\w+)?");
+
+    @Override
+    public boolean accept(final Path path) {
+      final String filename = path.getName();
+      final Matcher matcher = PATTERN.matcher(filename);
+      return matcher.matches();
+    }
+
+    public boolean acceptDir(final Path path) {
+      final String dirName = path.getName();
+      final Matcher matcher = DIRECTORY_PATTERN.matcher(dirName);
+      return matcher.matches();
+    }
+  }
+
+  /**
+   * Convert map contains max offset of each partition to string.
+   * @param topic Topic name
+   * @param checkpoint Map with partition as key and max offset as value
+   * @return Checkpoint string
+   */
+  private static String buildCheckpointStr(final String topic,
+                                           final HashMap<Integer, Integer> checkpoint) {
+    final StringBuilder checkpointStr = new StringBuilder();
+    checkpointStr.append(topic);
+    for (int i = 0; i < checkpoint.size(); ++i) {
+      checkpointStr.append(",").append(i).append(":").append(checkpoint.get(i));
+    }
+    return checkpointStr.toString();
+  }
+
+  /**
+   * List file status recursively.
+   * @param curPath Current Path
+   * @param filter PathFilter
+   * @return All file status match kafka connect naming convention
+   * @throws IOException
+   */
+  private ArrayList<FileStatus> listAllFileStatus(Path curPath,
+                                                  KafkaConnectPathFilter filter) throws IOException {
+    ArrayList<FileStatus> allFileStatus = new ArrayList<>();
+    FileStatus[] fileStatus = this.fs.listStatus(curPath);
+    for (FileStatus status : fileStatus) {
+      if (status.isDirectory() && filter.acceptDir(status.getPath())) {
+        allFileStatus.addAll(listAllFileStatus(status.getPath(), filter));
+      } else {
+        if (filter.accept(status.getPath())) {
+          allFileStatus.add(status);
+        }
+      }
+    }
+    return allFileStatus;
+  }
+
+  @Override
+  public String getCheckpoint() throws HoodieException {
+    final KafkaConnectPathFilter filter = new KafkaConnectPathFilter();
+    ArrayList<FileStatus> fileStatus;
+    try {
+      fileStatus = listAllFileStatus(this.path, filter);
+    } catch (IOException e) {
+      throw new HoodieException(e.toString());
+    }
+    if (fileStatus.size() == 0) {
+      throw new HoodieException("No valid Kafka Connect Hdfs file found under:" + this.path.getName());
+    }
+    final String topic = fileStatus.get(0).getPath().getName().split(FILENAME_SEPARATOR)[0];
+    int maxPartition = -1;
+    final HashMap<Integer, Integer> checkpointMap = new HashMap<>();
+    for (final FileStatus status : fileStatus) {
+      final String filename = status.getPath().getName();
+      final String[] groups = filename.split(FILENAME_SEPARATOR);
+      final int partition = Integer.parseInt(groups[1]);
+      final int offsetUpper = Integer.parseInt(groups[3]);
+      maxPartition = Math.max(maxPartition, partition);
+      if (checkpointMap.containsKey(partition)) {
+        checkpointMap.put(partition, Math.max(checkpointMap.get(partition), offsetUpper));
+      } else {
+        checkpointMap.put(partition, offsetUpper);
+      }
+    }
+    if (checkpointMap.size() != maxPartition + 1) {
+      throw new HoodieException("Missing partition from the file scan, "
+          + "max partition found(start from 0): "
+          + maxPartition
+          + " total partitions number appear in "
+          + this.path.getName()
+          + " is: "
+          + checkpointMap.size()
+          + " total partitions number expected: "
+          + (maxPartition + 1));
+    }
+    return buildCheckpointStr(topic, checkpointMap);
+  }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
new file mode 100644
index 0000000..fed8e46
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.checkpointing;
+
+import org.apache.hudi.common.HoodieCommonTestHarness;
+import org.apache.hudi.common.model.HoodieTestUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestKafkaConnectHdfsProvider extends HoodieCommonTestHarness {
+  private FileSystem fs = null;
+  private String topicPath = null;
+
+  @Before
+  public void init() {
+    // Prepare directories
+    initPath();
+    final Configuration hadoopConf = HoodieTestUtils.getDefaultHadoopConf();
+    fs = FSUtils.getFs(basePath, hadoopConf);
+  }
+
+  @Test
+  public void testValidKafkaConnectPath() throws Exception {
+    // a standard format(time based partition) of the files managed by kafka connect is:
+    // topic/year=xxx/month=xxx/day=xxx/topic+partition+lowerOffset+upperOffset.file
+    topicPath = basePath + "/topic1";
+    new File(topicPath).mkdirs();
+    // create regular kafka connect hdfs dirs
+    new File(topicPath + "/year=2016/month=05/day=01/").mkdirs();
+    new File(topicPath + "/year=2016/month=05/day=02/").mkdirs();
+    // kafka connect tmp folder
+    new File(topicPath + "/TMP").mkdirs();
+    // tmp file that being written
+    new File(topicPath + "/TMP/" + "topic1+0+301+400.parquet").createNewFile();
+    // regular parquet files
+    new File(topicPath + "/year=2016/month=05/day=01/"
+        + "topic1+0+100+200.parquet").createNewFile();
+    new File(topicPath + "/year=2016/month=05/day=01/"
+        + "topic1+1+100+200.parquet").createNewFile();
+    new File(topicPath + "/year=2016/month=05/day=02/"
+        + "topic1+0+201+300.parquet").createNewFile();
+    // noise parquet file
+    new File(topicPath + "/year=2016/month=05/day=01/"
+        + "random_snappy_1.parquet").createNewFile();
+    new File(topicPath + "/year=2016/month=05/day=02/"
+        + "random_snappy_2.parquet").createNewFile();
+    InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs);
+    assertEquals(provider.getCheckpoint(), "topic1,0:300,1:200");
+  }
+
+  @Test(expected = HoodieException.class)
+  public void testMissingPartition() throws Exception {
+    topicPath = basePath + "/topic2";
+    new File(topicPath).mkdirs();
+    // create regular kafka connect hdfs dirs
+    new File(topicPath + "/year=2016/month=05/day=01/").mkdirs();
+    new File(topicPath + "/year=2016/month=05/day=02/").mkdirs();
+    // parquet files with missing partition
+    new File(topicPath + "/year=2016/month=05/day=01/"
+        + "topic1+0+100+200.parquet").createNewFile();
+    new File(topicPath + "/year=2016/month=05/day=01/"
+        + "topic1+2+100+200.parquet").createNewFile();
+    new File(topicPath + "/year=2016/month=05/day=02/"
+        + "topic1+0+201+300.parquet").createNewFile();
+    InitialCheckPointProvider provider = new KafkaConnectHdfsProvider(new Path(topicPath), fs);
+    provider.getCheckpoint();
+  }
+}